diff --git a/datafusion-cli/Cargo.lock b/datafusion-cli/Cargo.lock index 1235c0b740bc..9fc22bf0830c 100644 --- a/datafusion-cli/Cargo.lock +++ b/datafusion-cli/Cargo.lock @@ -93,6 +93,7 @@ version = "0.16.0" source = "registry+https://github.com/rust-lang/crates.io-index" checksum = "ceb7c683b2f8f40970b70e39ff8be514c95b96fcb9c4af87e1ed2cb2e10801a0" dependencies = [ + "bzip2", "crc32fast", "digest", "lazy_static", @@ -110,6 +111,8 @@ dependencies = [ "thiserror", "typed-builder", "uuid", + "xz2", + "zstd", ] [[package]] @@ -1417,9 +1420,9 @@ checksum = "5443807d6dff69373d433ab9ef5378ad8df50ca6298caf15de6e52e24aaf54d5" [[package]] name = "errno" -version = "0.3.3" +version = "0.3.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "136526188508e25c6fef639d7927dfb3e0e3084488bf202267829cf7fc23dbdd" +checksum = "add4f07d43996f76ef320709726a556a9d4f965d9410d8d0271132d2f8293480" dependencies = [ "errno-dragonfly", "libc", @@ -2122,9 +2125,9 @@ dependencies = [ [[package]] name = "memchr" -version = "2.6.3" +version = "2.6.4" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "8f232d6ef707e1956a43342693d2a31e72989554d58299d7a88738cc95b0d35c" +checksum = "f665ee40bc4a3c5590afb1e9677db74a508659dfd71e126420da8274909a0167" [[package]] name = "mimalloc" @@ -2717,9 +2720,9 @@ checksum = "dbb5fb1acd8a1a18b3dd5be62d25485eb770e05afb408a9627d14d451bae12da" [[package]] name = "reqwest" -version = "0.11.20" +version = "0.11.22" source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "3e9ad3fe7488d7e34558a2033d45a0c90b72d97b4f80705666fea71472e2e6a1" +checksum = "046cd98826c46c2ac8ddecae268eb5c2e58628688a5fc7a2643704a73faba95b" dependencies = [ "base64", "bytes", @@ -2743,6 +2746,7 @@ dependencies = [ "serde", "serde_json", "serde_urlencoded", + "system-configuration", "tokio", "tokio-rustls 0.24.1", "tokio-util", @@ -3227,6 +3231,27 @@ dependencies = [ "unicode-ident", ] +[[package]] +name = "system-configuration" +version = "0.5.1" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ba3a3adc5c275d719af8cb4272ea1c4a6d668a777f37e115f6d11ddbc1c8e0e7" +dependencies = [ + "bitflags 1.3.2", + "core-foundation", + "system-configuration-sys", +] + +[[package]] +name = "system-configuration-sys" +version = "0.5.0" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "a75fb188eb626b924683e3b95e3a48e63551fcfb51949de2f06a9d91dbee93c9" +dependencies = [ + "core-foundation-sys", + "libc", +] + [[package]] name = "tempfile" version = "3.8.0" diff --git a/datafusion/physical-plan/src/lib.rs b/datafusion/physical-plan/src/lib.rs index aca10893db3d..3071fadcb1a2 100644 --- a/datafusion/physical-plan/src/lib.rs +++ b/datafusion/physical-plan/src/lib.rs @@ -17,6 +17,7 @@ //! Traits for physical query plan, supporting parallel execution for partitioned relations. +mod topk; mod visitor; pub use self::metrics::Metric; use self::metrics::MetricsSet; @@ -26,6 +27,7 @@ use self::{ pub use datafusion_common::{internal_err, ColumnStatistics, Statistics}; use datafusion_common::{plan_err, Result}; use datafusion_physical_expr::PhysicalSortExpr; +pub use topk::TopK; pub use visitor::{accept, visit_execution_plan, ExecutionPlanVisitor}; use arrow::datatypes::SchemaRef; diff --git a/datafusion/physical-plan/src/sorts/sort.rs b/datafusion/physical-plan/src/sorts/sort.rs index 7d260d42d9cd..703f80d90d2b 100644 --- a/datafusion/physical-plan/src/sorts/sort.rs +++ b/datafusion/physical-plan/src/sorts/sort.rs @@ -26,6 +26,7 @@ use crate::metrics::{ }; use crate::sorts::merge::streaming_merge; use crate::stream::{RecordBatchReceiverStream, RecordBatchStreamAdapter}; +use crate::topk::TopK; use crate::{ DisplayAs, DisplayFormatType, Distribution, EmptyRecordBatchStream, ExecutionPlan, Partitioning, SendableRecordBatchStream, Statistics, @@ -765,7 +766,12 @@ impl DisplayAs for SortExec { let expr: Vec = self.expr.iter().map(|e| e.to_string()).collect(); match self.fetch { Some(fetch) => { - write!(f, "SortExec: fetch={fetch}, expr=[{}]", expr.join(",")) + write!( + f, + // TODO should this say topk? + "SortExec: fetch={fetch}, expr=[{}]", + expr.join(",") + ) } None => write!(f, "SortExec: expr=[{}]", expr.join(",")), } @@ -853,29 +859,54 @@ impl ExecutionPlan for SortExec { trace!("End SortExec's input.execute for partition: {}", partition); - let mut sorter = ExternalSorter::new( - partition, - input.schema(), - self.expr.clone(), - context.session_config().batch_size(), - self.fetch, - execution_options.sort_spill_reservation_bytes, - execution_options.sort_in_place_threshold_bytes, - &self.metrics_set, - context.runtime_env(), - ); + if let Some(fetch) = self.fetch.as_ref() { + let mut topk = TopK::try_new( + partition, + input.schema(), + self.expr.clone(), + *fetch, + context.session_config().batch_size(), + context.runtime_env(), + &self.metrics_set, + partition, + )?; + + Ok(Box::pin(RecordBatchStreamAdapter::new( + self.schema(), + futures::stream::once(async move { + while let Some(batch) = input.next().await { + let batch = batch?; + topk.insert_batch(batch)?; + } + topk.emit() + }) + .try_flatten(), + ))) + } else { + let mut sorter = ExternalSorter::new( + partition, + input.schema(), + self.expr.clone(), + context.session_config().batch_size(), + self.fetch, + execution_options.sort_spill_reservation_bytes, + execution_options.sort_in_place_threshold_bytes, + &self.metrics_set, + context.runtime_env(), + ); - Ok(Box::pin(RecordBatchStreamAdapter::new( - self.schema(), - futures::stream::once(async move { - while let Some(batch) = input.next().await { - let batch = batch?; - sorter.insert_batch(batch).await?; - } - sorter.sort() - }) - .try_flatten(), - ))) + Ok(Box::pin(RecordBatchStreamAdapter::new( + self.schema(), + futures::stream::once(async move { + while let Some(batch) = input.next().await { + let batch = batch?; + sorter.insert_batch(batch).await?; + } + sorter.sort() + }) + .try_flatten(), + ))) + } } fn metrics(&self) -> Option { @@ -1043,7 +1074,7 @@ mod tests { assert_eq!(result.len(), 1); let metrics = sort_exec.metrics().unwrap(); - let did_it_spill = metrics.spill_count().unwrap() > 0; + let did_it_spill = metrics.spill_count().unwrap_or(0) > 0; assert_eq!(did_it_spill, expect_spillage, "with fetch: {fetch:?}"); } Ok(()) diff --git a/datafusion/physical-plan/src/topk/mod.rs b/datafusion/physical-plan/src/topk/mod.rs new file mode 100644 index 000000000000..4638c0dcf264 --- /dev/null +++ b/datafusion/physical-plan/src/topk/mod.rs @@ -0,0 +1,644 @@ +// Licensed to the Apache Software Foundation (ASF) under one +// or more contributor license agreements. See the NOTICE file +// distributed with this work for additional information +// regarding copyright ownership. The ASF licenses this file +// to you under the Apache License, Version 2.0 (the +// "License"); you may not use this file except in compliance +// with the License. You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, +// software distributed under the License is distributed on an +// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +// KIND, either express or implied. See the License for the +// specific language governing permissions and limitations +// under the License. + +//! TopK: Combination of Sort / LIMIT + +use arrow::{ + compute::interleave, + row::{RowConverter, Rows, SortField}, +}; +use std::{cmp::Ordering, collections::BinaryHeap, sync::Arc}; + +use arrow_array::{Array, ArrayRef, RecordBatch}; +use arrow_schema::SchemaRef; +use datafusion_common::Result; +use datafusion_execution::{ + memory_pool::{MemoryConsumer, MemoryReservation}, + runtime_env::RuntimeEnv, +}; +use datafusion_physical_expr::PhysicalSortExpr; +use hashbrown::HashMap; + +use crate::{stream::RecordBatchStreamAdapter, SendableRecordBatchStream}; + +use super::metrics::{BaselineMetrics, Count, ExecutionPlanMetricsSet, MetricBuilder}; + +/// Global TopK +/// +/// # Background +/// +/// "Top K" is a common query optimization used for queries such as +/// "find the top 3 customers by revenue". The (simplified) SQL for +/// such a query might be: +/// +/// ```sql +/// SELECT customer_id, revenue FROM 'sales.csv' ORDER BY revenue DESC limit 3; +/// ``` +/// +/// The simple plan would be: +/// +/// ```sql +/// > explain SELECT customer_id, revenue FROM sales ORDER BY revenue DESC limit 3; +/// +--------------+----------------------------------------+ +/// | plan_type | plan | +/// +--------------+----------------------------------------+ +/// | logical_plan | Limit: 3 | +/// | | Sort: revenue DESC NULLS FIRST | +/// | | Projection: customer_id, revenue | +/// | | TableScan: sales | +/// +--------------+----------------------------------------+ +/// ``` +/// +/// While this plan produces the correct answer, it will fully sorts the +/// input before discarding everything other than the top 3 elements. +/// +/// The same answer can be produced by simply keeping track of the top +/// K=3 elements, reducing the total amount of required buffer memory. +/// +/// # Structure +/// +/// This operator tracks the top K items using a `TopKHeap`. +pub struct TopK { + /// schema of the output (and the input) + schema: SchemaRef, + /// Runtime metrics + metrics: TopKMetrics, + /// Reservation + reservation: MemoryReservation, + /// The target number of rows for output batches + batch_size: usize, + /// sort expressions + expr: Arc<[PhysicalSortExpr]>, + /// row converter, for sort keys + row_converter: RowConverter, + /// scratch space for converting rows + scratch_rows: Rows, + /// stores the top k values and their sort key values, in order + heap: TopKHeap, +} + +impl TopK { + /// Create a new [`TopK`] that stores the top `k` values, as + /// defined by the sort expressions in `expr`. + // TOOD: make a builder or some other nicer API to avoid the + // clippy warning + #[allow(clippy::too_many_arguments)] + pub fn try_new( + partition_id: usize, + schema: SchemaRef, + expr: Vec, + k: usize, + batch_size: usize, + runtime: Arc, + metrics: &ExecutionPlanMetricsSet, + partition: usize, + ) -> Result { + let reservation = MemoryConsumer::new(format!("TopK[{partition_id}]")) + .register(&runtime.memory_pool); + + let expr: Arc<[PhysicalSortExpr]> = expr.into(); + + let sort_fields: Vec<_> = expr + .iter() + .map(|e| { + Ok(SortField::new_with_options( + e.expr.data_type(&schema)?, + e.options, + )) + }) + .collect::>()?; + + // TODO there is potential to add special cases for single column sort fields + // to improve performance + let row_converter = RowConverter::new(sort_fields)?; + let scratch_rows = row_converter.empty_rows( + batch_size, + 20 * batch_size, // guestimate 20 bytes per row + ); + + Ok(Self { + schema: schema.clone(), + metrics: TopKMetrics::new(metrics, partition), + reservation, + batch_size, + expr, + row_converter, + scratch_rows, + heap: TopKHeap::new(k, batch_size, schema), + }) + } + + /// Insert `batch`, remembering if any of its values are among + /// the top k seen so far. + pub fn insert_batch(&mut self, batch: RecordBatch) -> Result<()> { + // Updates on drop + let _timer = self.metrics.baseline.elapsed_compute().timer(); + + let sort_keys: Vec = self + .expr + .iter() + .map(|expr| { + let value = expr.expr.evaluate(&batch)?; + Ok(value.into_array(batch.num_rows())) + }) + .collect::>>()?; + + // reuse existing `Rows` to avoid reallocations + let rows = &mut self.scratch_rows; + rows.clear(); + self.row_converter.append(rows, &sort_keys)?; + + // TODO make this algorithmically better?: + // Idea: filter out rows >= self.heap.max() early (before passing to `RowConverter`) + // this avoids some work and also might be better vectorizable. + let mut batch_entry = self.heap.register_batch(batch); + for (index, row) in rows.iter().enumerate() { + match self.heap.max() { + // heap has k items, and the new row is greater than the + // current max in the heap ==> it is not a new topk + Some(max_row) if row.as_ref() >= max_row.row() => {} + // don't yet have k items or new item is lower than the currently k low values + None | Some(_) => { + self.heap.add(&mut batch_entry, row, index); + self.metrics.row_replacements.add(1); + } + } + } + self.heap.insert_batch_entry(batch_entry); + + // conserve memory + self.heap.maybe_compact()?; + + // update memory reservation + self.reservation.try_resize(self.size())?; + Ok(()) + } + + /// Returns the top k results broken into `batch_size` [`RecordBatch`]es, consuming the heap + pub fn emit(self) -> Result { + let Self { + schema, + metrics, + reservation: _, + batch_size, + expr: _, + row_converter: _, + scratch_rows: _, + mut heap, + } = self; + let _timer = metrics.baseline.elapsed_compute().timer(); // time updated on drop + + let mut batch = heap.emit()?; + metrics.baseline.output_rows().add(batch.num_rows()); + + // break into record batches as needed + let mut batches = vec![]; + loop { + if batch.num_rows() < batch_size { + batches.push(Ok(batch)); + break; + } else { + batches.push(Ok(batch.slice(0, batch_size))); + let remaining_length = batch.num_rows() - batch_size; + batch = batch.slice(batch_size, remaining_length); + } + } + Ok(Box::pin(RecordBatchStreamAdapter::new( + schema, + futures::stream::iter(batches), + ))) + } + + /// return the size of memory used by this operator, in bytes + fn size(&self) -> usize { + std::mem::size_of::() + + self.row_converter.size() + + self.scratch_rows.size() + + self.heap.size() + } +} + +struct TopKMetrics { + /// metrics + pub baseline: BaselineMetrics, + + /// count of how many rows were replaced in the heap + pub row_replacements: Count, +} + +impl TopKMetrics { + fn new(metrics: &ExecutionPlanMetricsSet, partition: usize) -> Self { + Self { + baseline: BaselineMetrics::new(metrics, partition), + row_replacements: MetricBuilder::new(metrics) + .counter("row_replacements", partition), + } + } +} + +/// This structure keeps at most the *smallest* k items, using the +/// [arrow::row] format for sort keys. While it is called "topK" for +/// values like `1, 2, 3, 4, 5` the "top 3" really means the +/// *smallest* 3 , `1, 2, 3`, not the *largest* 3 `3, 4, 5`. +/// +/// Using the `Row` format handles things such as ascending vs +/// descending and nulls first vs nulls last. +struct TopKHeap { + /// The maximum number of elemenents to store in this heap. + k: usize, + /// The target number of rows for output batches + batch_size: usize, + /// Storage for up at most `k` items using a BinaryHeap. Reverserd + /// so that the smallest k so far is on the top + inner: BinaryHeap, + /// Storage the original row values (TopKRow only has the sort key) + store: RecordBatchStore, + /// The size of all owned data held by this heap + owned_bytes: usize, +} + +impl TopKHeap { + fn new(k: usize, batch_size: usize, schema: SchemaRef) -> Self { + assert!(k > 0); + Self { + k, + batch_size, + inner: BinaryHeap::new(), + store: RecordBatchStore::new(schema), + owned_bytes: 0, + } + } + + /// Register a [`RecordBatch`] with the heap, returning the + /// appropriate entry + pub fn register_batch(&mut self, batch: RecordBatch) -> RecordBatchEntry { + self.store.register(batch) + } + + /// Insert a [`RecordBatchEntry`] created by a previous call to + /// [`Self::register_batch`] into storage. + pub fn insert_batch_entry(&mut self, entry: RecordBatchEntry) { + self.store.insert(entry) + } + + /// Returns the largest value stored by the heap if there are k + /// items, otherwise returns None. Remember this structure is + /// keeping the "smallest" k values + fn max(&self) -> Option<&TopKRow> { + if self.inner.len() < self.k { + None + } else { + self.inner.peek() + } + } + + /// Adds `row` to this heap. If inserting this new item would + /// increase the size past `k`, removes the previously smallest + /// item. + fn add( + &mut self, + batch_entry: &mut RecordBatchEntry, + row: impl AsRef<[u8]>, + index: usize, + ) { + let batch_id = batch_entry.id; + batch_entry.uses += 1; + + assert!(self.inner.len() <= self.k); + let row = row.as_ref(); + + // Reuse storage for evicted item if possible + let new_top_k = if self.inner.len() == self.k { + let prev_min = self.inner.pop().unwrap(); + + // Update batch use + if prev_min.batch_id == batch_entry.id { + batch_entry.uses -= 1; + } else { + self.store.unuse(prev_min.batch_id); + } + + // update memory accounting + self.owned_bytes -= prev_min.owned_size(); + prev_min.with_new_row(row, batch_id, index) + } else { + TopKRow::new(row, batch_id, index) + }; + + self.owned_bytes += new_top_k.owned_size(); + + // put the new row into the heap + self.inner.push(new_top_k) + } + + /// Returns the values stored in this heap, from values low to + /// high, as a single [`RecordBatch`], resetting the inner heap + pub fn emit(&mut self) -> Result { + Ok(self.emit_with_state()?.0) + } + + /// Returns the values stored in this heap, from values low to + /// high, as a single [`RecordBatch`], and a sorted vec of the + /// current heap's contents + pub fn emit_with_state(&mut self) -> Result<(RecordBatch, Vec)> { + let schema = self.store.schema().clone(); + + // generate sorted rows + let topk_rows = std::mem::take(&mut self.inner).into_sorted_vec(); + + if self.store.is_empty() { + return Ok((RecordBatch::new_empty(schema), topk_rows)); + } + + // Indices for each row within its respective RecordBatch + let indices: Vec<_> = topk_rows + .iter() + .enumerate() + .map(|(i, k)| (i, k.index)) + .collect(); + + let num_columns = schema.fields().len(); + + // build the output columns one at time, using the + // `interleave` kernel to pick rows from different arrays + let output_columns: Vec<_> = (0..num_columns) + .map(|col| { + let input_arrays: Vec<_> = topk_rows + .iter() + .map(|k| { + let entry = + self.store.get(k.batch_id).expect("invalid stored batch id"); + entry.batch.column(col) as &dyn Array + }) + .collect(); + + // at this point `indices` contains indexes within the + // rows and `input_arrays` contains a reference to the + // relevant Array for that index. `interleave` pulls + // them together into a single new array + Ok(interleave(&input_arrays, &indices)?) + }) + .collect::>()?; + + let new_batch = RecordBatch::try_new(schema, output_columns)?; + Ok((new_batch, topk_rows)) + } + + /// Compact this heap, rewriting all stored batches into a single + /// input batch + pub fn maybe_compact(&mut self) -> Result<()> { + // we compact if the number of "unused" rows in the store is + // past some pre-defined threshold. Target holding up to + // around 20 batches, but handle cases of large k where some + // batches might be partially full + let max_unused_rows = (20 * self.batch_size) + self.k; + let unused_rows = self.store.unused_rows(); + + // don't compact if the store has one extra batch or + // unused rows is under the threshold + if self.store.len() <= 2 || unused_rows < max_unused_rows { + return Ok(()); + } + // at first, compact the entire thing always into a new batch + // (maybe we can get fancier in the future about ignoring + // batches that have a high usage ratio already + + // Note: new batch is in the same order as inner + let num_rows = self.inner.len(); + let (new_batch, mut topk_rows) = self.emit_with_state()?; + + // clear all old entires in store (this invalidates all + // store_ids in `inner`) + self.store.clear(); + + let mut batch_entry = self.register_batch(new_batch); + batch_entry.uses = num_rows; + + // rewrite all existing entries to use the new batch, and + // remove old entries. The sortedness and their relative + // position do not change + for (i, topk_row) in topk_rows.iter_mut().enumerate() { + topk_row.batch_id = batch_entry.id; + topk_row.index = i; + } + self.insert_batch_entry(batch_entry); + // restore the heap + self.inner = BinaryHeap::from(topk_rows); + + Ok(()) + } + + /// return the size of memory used by this heap, in bytes + fn size(&self) -> usize { + std::mem::size_of::() + + (self.inner.capacity() * std::mem::size_of::()) + + self.store.size() + + self.owned_bytes + } +} + +/// Represents one of the top K rows held in this heap. Orders +/// according to memcmp of row (e.g. the arrow Row format, but could +/// also be primtive values) +/// +/// Reuses allocations to minimize runtime overhead of creating new Vecs +#[derive(Debug, PartialEq)] +struct TopKRow { + /// the value of the sort key for this row. This contains the + /// bytes that could be stored in `OwnedRow` but uses `Vec` to + /// reuse allocations. + row: Vec, + /// the RecordBatch this row came from: an id into a [`RecordBatchStore`] + batch_id: u32, + /// the index in this record batch the row came from + index: usize, +} + +impl TopKRow { + /// Create a new TopKRow with new allocation + fn new(row: impl AsRef<[u8]>, batch_id: u32, index: usize) -> Self { + Self { + row: row.as_ref().to_vec(), + batch_id, + index, + } + } + + /// Create a new TopKRow reusing the existing allocation + fn with_new_row( + self, + new_row: impl AsRef<[u8]>, + batch_id: u32, + index: usize, + ) -> Self { + let Self { + mut row, + batch_id: _, + index: _, + } = self; + row.clear(); + row.extend_from_slice(new_row.as_ref()); + + Self { + row, + batch_id, + index, + } + } + + /// Returns the number of bytes owned by this row in the heap (not + /// including itself) + fn owned_size(&self) -> usize { + self.row.capacity() + } + + /// Returns a slice to the owned row value + fn row(&self) -> &[u8] { + self.row.as_slice() + } +} + +impl Eq for TopKRow {} + +impl PartialOrd for TopKRow { + fn partial_cmp(&self, other: &Self) -> Option { + Some(self.cmp(other)) + } +} + +impl Ord for TopKRow { + fn cmp(&self, other: &Self) -> Ordering { + self.row.cmp(&other.row) + } +} + +#[derive(Debug)] +struct RecordBatchEntry { + id: u32, + batch: RecordBatch, + // for this batch, how many times has it been used + uses: usize, +} + +/// This structure tracks [`RecordBatch`] by an id so that: +/// +/// 1. The baches can be tracked via an id that can be copied cheaply +/// 2. The total memory held by all batches is tracked +#[derive(Debug)] +struct RecordBatchStore { + /// id generator + next_id: u32, + /// storage + batches: HashMap, + /// total size of all record batches tracked by this store + batches_size: usize, + /// schema of the batches + schema: SchemaRef, +} + +impl RecordBatchStore { + fn new(schema: SchemaRef) -> Self { + Self { + next_id: 0, + batches: HashMap::new(), + batches_size: 0, + schema, + } + } + + /// Register this batch with the store and assign an ID. No + /// attempt is made to compare this batch to other batches + pub fn register(&mut self, batch: RecordBatch) -> RecordBatchEntry { + let id = self.next_id; + self.next_id += 1; + RecordBatchEntry { id, batch, uses: 0 } + } + + /// Insert a record batch entry into this store, tracking its + /// memory use, if it has any uses + pub fn insert(&mut self, entry: RecordBatchEntry) { + // uses of 0 means that none of the rows in the batch were stored in the topk + if entry.uses > 0 { + self.batches_size += entry.batch.get_array_memory_size(); + self.batches.insert(entry.id, entry); + } + } + + /// Clear all values in this store, invalidating all previous batch ids + fn clear(&mut self) { + self.batches.clear(); + self.batches_size = 0; + } + + fn get(&self, id: u32) -> Option<&RecordBatchEntry> { + self.batches.get(&id) + } + + /// returns the total number of batches stored in this store + fn len(&self) -> usize { + self.batches.len() + } + + /// Returns the total number of rows in batches minus the number + /// which are in use + fn unused_rows(&self) -> usize { + self.batches + .values() + .map(|batch_entry| batch_entry.batch.num_rows() - batch_entry.uses) + .sum() + } + + /// returns true if the store has nothing stored + fn is_empty(&self) -> bool { + self.batches.is_empty() + } + + /// return the schema of batches stored + fn schema(&self) -> &SchemaRef { + &self.schema + } + + /// remove a use from the specified batch id. If the use count + /// reaches zero the batch entry is removed from the store + /// + /// panics if there were no remaining uses of id + pub fn unuse(&mut self, id: u32) { + let remove = if let Some(batch_entry) = self.batches.get_mut(&id) { + batch_entry.uses = batch_entry.uses.checked_sub(1).expect("underflow"); + batch_entry.uses == 0 + } else { + panic!("No entry for id {id}"); + }; + + if remove { + let old_entry = self.batches.remove(&id).unwrap(); + self.batches_size = self + .batches_size + .checked_sub(old_entry.batch.get_array_memory_size()) + .unwrap(); + } + } + + /// returns the size of memory used by this store, including all + /// referenced `RecordBatch`es, in bytes + pub fn size(&self) -> usize { + std::mem::size_of::() + + self.batches.capacity() + * (std::mem::size_of::() + std::mem::size_of::()) + + self.batches_size + } +} diff --git a/datafusion/sqllogictest/test_files/decimal.slt b/datafusion/sqllogictest/test_files/decimal.slt index a326a0cc4941..d7632138a84e 100644 --- a/datafusion/sqllogictest/test_files/decimal.slt +++ b/datafusion/sqllogictest/test_files/decimal.slt @@ -507,27 +507,26 @@ select * from decimal_simple where c1 >= 0.00004 order by c1; query RRIBR -select * from decimal_simple where c1 >= 0.00004 order by c1 limit 10; +select * from decimal_simple where c1 >= 0.00004 order by c1, c3 limit 10; ---- 0.00004 0.000000000004 5 true 0.000044 +0.00004 0.000000000004 8 false 0.000044 0.00004 0.000000000004 12 false 0.00004 0.00004 0.000000000004 14 true 0.00004 -0.00004 0.000000000004 8 false 0.000044 -0.00005 0.000000000005 9 true 0.000052 +0.00005 0.000000000005 1 false 0.0001 0.00005 0.000000000005 4 true 0.000078 0.00005 0.000000000005 8 false 0.000033 +0.00005 0.000000000005 9 true 0.000052 0.00005 0.000000000005 100 true 0.000068 -0.00005 0.000000000005 1 false 0.0001 - query RRIBR -select * from decimal_simple where c1 >= 0.00004 order by c1 limit 5; +select * from decimal_simple where c1 >= 0.00004 order by c1, c3 limit 5; ---- 0.00004 0.000000000004 5 true 0.000044 +0.00004 0.000000000004 8 false 0.000044 0.00004 0.000000000004 12 false 0.00004 0.00004 0.000000000004 14 true 0.00004 -0.00004 0.000000000004 8 false 0.000044 -0.00005 0.000000000005 9 true 0.000052 +0.00005 0.000000000005 1 false 0.0001 query RRIBR diff --git a/datafusion/sqllogictest/test_files/topk.slt b/datafusion/sqllogictest/test_files/topk.slt new file mode 100644 index 000000000000..8d3b70139d35 --- /dev/null +++ b/datafusion/sqllogictest/test_files/topk.slt @@ -0,0 +1,232 @@ +# Licensed to the Apache Software Foundation (ASF) under one +# or more contributor license agreements. See the NOTICE file +# distributed with this work for additional information +# regarding copyright ownership. The ASF licenses this file +# to you under the Apache License, Version 2.0 (the +# "License"); you may not use this file except in compliance +# with the License. You may obtain a copy of the License at + +# http://www.apache.org/licenses/LICENSE-2.0 + +# Unless required by applicable law or agreed to in writing, +# software distributed under the License is distributed on an +# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY +# KIND, either express or implied. See the License for the +# specific language governing permissions and limitations +# under the License. + +# Tests for development + +statement ok +create table topk(x int) as values (10), (2), (3), (0), (5), (4), (3), (2), (1), (3), (8); + +query I +select * from topk order by x; +---- +0 +1 +2 +2 +3 +3 +3 +4 +5 +8 +10 + +query I +select * from topk order by x limit 3; +---- +0 +1 +2 + +query I +select * from topk order by x desc limit 3; +---- +10 +8 +5 + + + + +statement ok +CREATE EXTERNAL TABLE aggregate_test_100 ( + c1 VARCHAR NOT NULL, + c2 TINYINT NOT NULL, + c3 SMALLINT NOT NULL, + c4 SMALLINT, + c5 INT, + c6 BIGINT NOT NULL, + c7 SMALLINT NOT NULL, + c8 INT NOT NULL, + c9 BIGINT UNSIGNED NOT NULL, + c10 VARCHAR NOT NULL, + c11 FLOAT NOT NULL, + c12 DOUBLE NOT NULL, + c13 VARCHAR NOT NULL +) +STORED AS CSV +WITH HEADER ROW +LOCATION '../../testing/data/csv/aggregate_test_100.csv' + +query TT +explain select * from aggregate_test_100 ORDER BY c13 desc limit 5; +---- +logical_plan +Limit: skip=0, fetch=5 +--Sort: aggregate_test_100.c13 DESC NULLS FIRST, fetch=5 +----TableScan: aggregate_test_100 projection=[c1, c2, c3, c4, c5, c6, c7, c8, c9, c10, c11, c12, c13] +physical_plan +GlobalLimitExec: skip=0, fetch=5 +--SortExec: fetch=5, expr=[c13@12 DESC] +----CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c1, c2, c3, c4, c5, c6, c7, c8, c9, c10, c11, c12, c13], has_header=true + + + + +query T +select c13 from aggregate_test_100 ORDER BY c13; +---- +0VVIHzxWtNOFLtnhjHEKjXaJOSLJfm +0keZ5G8BffGwgF2RwQD59TFzMStxCB +0og6hSkhbX8AC1ktFS4kounvTzy8Vo +1aOcrEGd0cOqZe2I5XBOm0nDcwtBZO +2T3wSlHdEmASmO0xcXHnndkKEt6bz8 +3BEOHQsMEFZ58VcNTOJYShTBpAPzbt +4HX6feIvmNXBN7XGqgO4YVBkhu8GDI +4JznSdBajNWhu4hRQwjV1FjTTxY68i +52mKlRE3aHCBZtjECq6sY9OqVf8Dze +56MZa5O1hVtX4c5sbnCfxuX5kDChqI +6FPJlLAcaQ5uokyOWZ9HGdLZObFvOZ +6WfVFBVGJSQb7FhA7E0lBwdvjfZnSW +6oIXZuIPIqEoPBvFmbt2Nxy3tryGUE +6x93sxYioWuq5c9Kkk8oTAAORM7cH0 +802bgTGl6Bk5TlkPYYTxp5JkKyaYUA +8LIh0b6jmDGm87BmIyjdxNIpX4ugjD +90gAtmGEeIqUTbo1ZrxCvWtsseukXC +9UbObCsVkmYpJGcGrgfK90qOnwb2Lj +AFGCj7OWlEB5QfniEFgonMq90Tq5uH +ALuRhobVWbnQTTWZdSOk0iVe8oYFhW +Amn2K87Db5Es3dFQO9cw9cvpAM6h35 +AyYVExXK6AR2qUTxNZ7qRHQOVGMLcz +BJqx5WokrmrrezZA0dUbleMYkG5U2O +BPtQMxnuSPpxMExYV9YkDa6cAN7GP3 +BsM5ZAYifRh5Lw3Y8X1r53I0cTJnfE +C2GT5KVyOPZpgKVl110TyZO0NcJ434 +DuJNG8tufSqW0ZstHqWj3aGvFLMg4A +EcCuckwsF3gV1Ecgmh5v4KM8g1ozif +ErJFw6hzZ5fmI5r8bhE4JzlscnhKZU +F7NSTjWvQJyBburN7CXRUlbgp2dIrA +Fi4rJeTQq4eXj8Lxg3Hja5hBVTVV5u +H5j5ZHy1FGesOAHjkQEDYCucbpKWRu +HKSMQ9nTnwXCJIte1JrM1dtYnDtJ8g +IWl0G3ZlMNf7WT8yjIB49cx7MmYOmr +IZTkHMLvIKuiLjhDjYMmIHxh166we4 +Ig1QcuKsjHXkproePdERo2w0mYzIqd +JHNgc2UCaiXOdmkxwDDyGhRlO0mnBQ +JN0VclewmjwYlSl8386MlWv5rEhWCz +JafwVLSVk5AVoXFuzclesQ000EE2k1 +KJFcmTVjdkCMv94wYCtfHMFhzyRsmH +Ktb7GQ0N1DrxwkCkEUsTaIXk0xYinn +Ld2ej8NEv5zNcqU60FwpHeZKBhfpiV +LiEBxds3X0Uw0lxiYjDqrkAaAwoiIW +MXhhH1Var3OzzJCtI9VNyYvA0q8UyJ +MeSTAXq8gVxVjbEjgkvU9YLte0X9uE +NEhyk8uIx4kEULJGa8qIyFjjBcP2G6 +O66j6PaYuZhEUtqV6fuU7TyjM2WxC5 +OF7fQ37GzaZ5ikA2oMyvleKtgnLjXh +OPwBqCEK5PWTjWaiOyL45u2NLTaDWv +Oq6J4Rx6nde0YlhOIJkFsX2MsSvAQ0 +Ow5PGpfTm4dXCfTDsXAOTatXRoAydR +QEHVvcP8gxI6EMJIrvcnIhgzPNjIvv +QJYm7YRA3YetcBHI5wkMZeLXVmfuNy +QYlaIAnJA6r8rlAb6f59wcxvcPcWFf +RilTlL1tKkPOUFuzmLydHAVZwv1OGl +Sfx0vxv1skzZWT1PqVdoRDdO6Sb6xH +TTQUwpMNSXZqVBKAFvXu7OlWvKXJKX +TtDKUZxzVxsq758G6AWPSYuZgVgbcl +VDhtJkYjAYPykCgOU9x3v7v3t4SO1a +VY0zXmXeksCT8BzvpzpPLbmU9Kp9Y4 +Vp3gmWunM5A7wOC9YW2JroFqTWjvTi +WHmjWk2AY4c6m7DA4GitUx6nmb1yYS +XemNcT1xp61xcM1Qz3wZ1VECCnq06O +Z2sWcQr0qyCJRMHDpRy3aQr7PkHtkK +aDxBtor7Icd9C5hnTvvw5NrIre740e +akiiY5N0I44CMwEnBL6RTBk7BRkxEj +b3b9esRhTzFEawbs6XhpKnD9ojutHB +bgK1r6v3BCTh0aejJUhkA1Hn6idXGp +cBGc0kSm32ylBDnxogG727C0uhZEYZ +cq4WSAIFwx3wwTUS5bp1wCe71R6U5I +dVdvo6nUD5FgCgsbOZLds28RyGTpnx +e2Gh6Ov8XkXoFdJWhl0EjwEHlMDYyG +f9ALCzwDAKmdu7Rk2msJaB1wxe5IBX +fuyvs0w7WsKSlXqJ1e6HFSoLmx03AG +gTpyQnEODMcpsPnJMZC66gh33i3m0b +gpo8K5qtYePve6jyPt6xgJx4YOVjms +gxfHWUF8XgY2KdFxigxvNEXe2V2XMl +i6RQVXKUh7MzuGMDaNclUYnFUAireU +ioEncce3mPOXD2hWhpZpCPWGATG6GU +jQimhdepw3GKmioWUlVSWeBVRKFkY3 +l7uwDoTepWwnAP0ufqtHJS3CRi7RfP +lqhzgLsXZ8JhtpeeUWWNbMz8PHI705 +m6jD0LBIQWaMfenwRCTANI9eOdyyto +mhjME0zBHbrK6NMkytMTQzOssOa1gF +mzbkwXKrPeZnxg2Kn1LRF5hYSsmksS +nYVJnVicpGRqKZibHyBAmtmzBXAFfT +oHJMNvWuunsIMIWFnYG31RCfkOo2V7 +oLZ21P2JEDooxV1pU31cIxQHEeeoLu +okOkcWflkNXIy4R8LzmySyY1EC3sYd +pLk3i59bZwd5KBZrI1FiweYTd5hteG +pTeu0WMjBRTaNRT15rLCuEh3tBJVc5 +qnPOOmslCJaT45buUisMRnM0rc77EK +t6fQUjJejPcjc04wHvHTPe55S65B4V +ukOiFGGFnQJDHFgZxHMpvhD3zybF0M +ukyD7b0Efj7tNlFSRmzZ0IqkEzg2a8 +waIGbOGl1PM6gnzZ4uuZt4E2yDWRHs +wwXqSGKLyBQyPkonlzBNYUJTCo4LRS +xipQ93429ksjNcXPX5326VSg1xJZcW +y7C453hRWd4E7ImjNDWlpexB8nUqjh +ydkwycaISlYSlEq3TlkS2m15I2pcp8 + + +query TIIIIIIIITRRT +select * from aggregate_test_100 ORDER BY c13 desc limit 5; +---- +a 4 -38 20744 762932956 308913475857409919 7 45465 1787652631 878137512938218976 0.7459874 0.021825780392 ydkwycaISlYSlEq3TlkS2m15I2pcp8 +d 1 -98 13630 -1991133944 1184110014998006843 220 2986 225513085 9634106610243643486 0.89651865 0.164088254508 y7C453hRWd4E7ImjNDWlpexB8nUqjh +e 2 52 -12056 -1090239422 9011500141803970147 238 4168 2013662838 12565360638488684051 0.6694766 0.391444365692 xipQ93429ksjNcXPX5326VSg1xJZcW +d 1 -72 25590 1188089983 3090286296481837049 241 832 3542840110 5885937420286765261 0.41980565 0.215354023438 wwXqSGKLyBQyPkonlzBNYUJTCo4LRS +a 1 -5 12636 794623392 2909750622865366631 15 24022 2669374863 4776679784701509574 0.29877836 0.253725340799 waIGbOGl1PM6gnzZ4uuZt4E2yDWRHs + + + +## -- make tiny batches to trigger batch compaction +statement ok +set datafusion.execution.batch_size = 2 + +query TIIIIIIIITRRT +select * from aggregate_test_100 ORDER BY c13 desc limit 5; +---- +a 4 -38 20744 762932956 308913475857409919 7 45465 1787652631 878137512938218976 0.7459874 0.021825780392 ydkwycaISlYSlEq3TlkS2m15I2pcp8 +d 1 -98 13630 -1991133944 1184110014998006843 220 2986 225513085 9634106610243643486 0.89651865 0.164088254508 y7C453hRWd4E7ImjNDWlpexB8nUqjh +e 2 52 -12056 -1090239422 9011500141803970147 238 4168 2013662838 12565360638488684051 0.6694766 0.391444365692 xipQ93429ksjNcXPX5326VSg1xJZcW +d 1 -72 25590 1188089983 3090286296481837049 241 832 3542840110 5885937420286765261 0.41980565 0.215354023438 wwXqSGKLyBQyPkonlzBNYUJTCo4LRS +a 1 -5 12636 794623392 2909750622865366631 15 24022 2669374863 4776679784701509574 0.29877836 0.253725340799 waIGbOGl1PM6gnzZ4uuZt4E2yDWRHs + + +## make an example for dictionary encoding + +statement ok +create table dict as select c1, c2, c3, c13, arrow_cast(c13, 'Dictionary(Int32, Utf8)') as c13_dict from aggregate_test_100; + +query TIIT? +select * from dict order by c13 desc limit 5; +---- +a 4 -38 ydkwycaISlYSlEq3TlkS2m15I2pcp8 ydkwycaISlYSlEq3TlkS2m15I2pcp8 +d 1 -98 y7C453hRWd4E7ImjNDWlpexB8nUqjh y7C453hRWd4E7ImjNDWlpexB8nUqjh +e 2 52 xipQ93429ksjNcXPX5326VSg1xJZcW xipQ93429ksjNcXPX5326VSg1xJZcW +d 1 -72 wwXqSGKLyBQyPkonlzBNYUJTCo4LRS wwXqSGKLyBQyPkonlzBNYUJTCo4LRS +a 1 -5 waIGbOGl1PM6gnzZ4uuZt4E2yDWRHs waIGbOGl1PM6gnzZ4uuZt4E2yDWRHs diff --git a/datafusion/sqllogictest/test_files/window.slt b/datafusion/sqllogictest/test_files/window.slt index b6325fd889ec..5fb5a04c6709 100644 --- a/datafusion/sqllogictest/test_files/window.slt +++ b/datafusion/sqllogictest/test_files/window.slt @@ -2601,6 +2601,7 @@ SELECT # test_source_sorted_builtin query TT EXPLAIN SELECT + ts, FIRST_VALUE(inc_col) OVER(ORDER BY ts RANGE BETWEEN 10 PRECEDING and 1 FOLLOWING) as fv1, FIRST_VALUE(inc_col) OVER(ORDER BY ts ROWS BETWEEN 10 PRECEDING and 1 FOLLOWING) as fv2, LAST_VALUE(inc_col) OVER(ORDER BY ts RANGE BETWEEN 10 PRECEDING and 1 FOLLOWING) as lv1, @@ -2630,24 +2631,23 @@ EXPLAIN SELECT LIMIT 5; ---- logical_plan -Projection: fv1, fv2, lv1, lv2, nv1, nv2, rn1, rn2, rank1, rank2, dense_rank1, dense_rank2, lag1, lag2, lead1, lead2, fvr1, fvr2, lvr1, lvr2, lagr1, lagr2, leadr1, leadr2 ---Limit: skip=0, fetch=5 -----Sort: annotated_data_finite.ts DESC NULLS FIRST, fetch=5 -------Projection: FIRST_VALUE(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 10 PRECEDING AND 1 FOLLOWING AS fv1, FIRST_VALUE(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] ROWS BETWEEN 10 PRECEDING AND 1 FOLLOWING AS fv2, LAST_VALUE(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 10 PRECEDING AND 1 FOLLOWING AS lv1, LAST_VALUE(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] ROWS BETWEEN 10 PRECEDING AND 1 FOLLOWING AS lv2, NTH_VALUE(annotated_data_finite.inc_col,Int64(5)) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 10 PRECEDING AND 1 FOLLOWING AS nv1, NTH_VALUE(annotated_data_finite.inc_col,Int64(5)) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] ROWS BETWEEN 10 PRECEDING AND 1 FOLLOWING AS nv2, ROW_NUMBER() ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 1 PRECEDING AND 10 FOLLOWING AS rn1, ROW_NUMBER() ORDER BY [annotated_data_finite.ts ASC NULLS LAST] ROWS BETWEEN 10 PRECEDING AND 1 FOLLOWING AS rn2, RANK() ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 1 PRECEDING AND 10 FOLLOWING AS rank1, RANK() ORDER BY [annotated_data_finite.ts ASC NULLS LAST] ROWS BETWEEN 10 PRECEDING AND 1 FOLLOWING AS rank2, DENSE_RANK() ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 1 PRECEDING AND 10 FOLLOWING AS dense_rank1, DENSE_RANK() ORDER BY [annotated_data_finite.ts ASC NULLS LAST] ROWS BETWEEN 10 PRECEDING AND 1 FOLLOWING AS dense_rank2, LAG(annotated_data_finite.inc_col,Int64(1),Int64(1001)) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 1 PRECEDING AND 10 FOLLOWING AS lag1, LAG(annotated_data_finite.inc_col,Int64(2),Int64(1002)) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] ROWS BETWEEN 10 PRECEDING AND 1 FOLLOWING AS lag2, LEAD(annotated_data_finite.inc_col,Int64(-1),Int64(1001)) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 1 PRECEDING AND 10 FOLLOWING AS lead1, LEAD(annotated_data_finite.inc_col,Int64(4),Int64(1004)) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] ROWS BETWEEN 10 PRECEDING AND 1 FOLLOWING AS lead2, FIRST_VALUE(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 10 PRECEDING AND 1 FOLLOWING AS fvr1, FIRST_VALUE(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] ROWS BETWEEN 10 PRECEDING AND 1 FOLLOWING AS fvr2, LAST_VALUE(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 10 PRECEDING AND 1 FOLLOWING AS lvr1, LAST_VALUE(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] ROWS BETWEEN 10 PRECEDING AND 1 FOLLOWING AS lvr2, LAG(annotated_data_finite.inc_col,Int64(1),Int64(1001)) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 1 PRECEDING AND 10 FOLLOWING AS lagr1, LAG(annotated_data_finite.inc_col,Int64(2),Int64(1002)) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] ROWS BETWEEN 10 PRECEDING AND 1 FOLLOWING AS lagr2, LEAD(annotated_data_finite.inc_col,Int64(-1),Int64(1001)) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 1 PRECEDING AND 10 FOLLOWING AS leadr1, LEAD(annotated_data_finite.inc_col,Int64(4),Int64(1004)) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] ROWS BETWEEN 10 PRECEDING AND 1 FOLLOWING AS leadr2, annotated_data_finite.ts ---------WindowAggr: windowExpr=[[FIRST_VALUE(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 10 PRECEDING AND 1 FOLLOWING, FIRST_VALUE(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] ROWS BETWEEN 10 PRECEDING AND 1 FOLLOWING, LAST_VALUE(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 10 PRECEDING AND 1 FOLLOWING, LAST_VALUE(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] ROWS BETWEEN 10 PRECEDING AND 1 FOLLOWING, NTH_VALUE(annotated_data_finite.inc_col, Int64(5)) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 10 PRECEDING AND 1 FOLLOWING, NTH_VALUE(annotated_data_finite.inc_col, Int64(5)) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] ROWS BETWEEN 10 PRECEDING AND 1 FOLLOWING, ROW_NUMBER() ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 1 PRECEDING AND 10 FOLLOWING, ROW_NUMBER() ORDER BY [annotated_data_finite.ts ASC NULLS LAST] ROWS BETWEEN 10 PRECEDING AND 1 FOLLOWING, RANK() ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 1 PRECEDING AND 10 FOLLOWING, RANK() ORDER BY [annotated_data_finite.ts ASC NULLS LAST] ROWS BETWEEN 10 PRECEDING AND 1 FOLLOWING, DENSE_RANK() ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 1 PRECEDING AND 10 FOLLOWING, DENSE_RANK() ORDER BY [annotated_data_finite.ts ASC NULLS LAST] ROWS BETWEEN 10 PRECEDING AND 1 FOLLOWING, LAG(annotated_data_finite.inc_col, Int64(1), Int64(1001)) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 1 PRECEDING AND 10 FOLLOWING, LAG(annotated_data_finite.inc_col, Int64(2), Int64(1002)) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] ROWS BETWEEN 10 PRECEDING AND 1 FOLLOWING, LEAD(annotated_data_finite.inc_col, Int64(-1), Int64(1001)) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 1 PRECEDING AND 10 FOLLOWING, LEAD(annotated_data_finite.inc_col, Int64(4), Int64(1004)) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] ROWS BETWEEN 10 PRECEDING AND 1 FOLLOWING]] -----------WindowAggr: windowExpr=[[FIRST_VALUE(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 10 PRECEDING AND 1 FOLLOWING, FIRST_VALUE(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] ROWS BETWEEN 10 PRECEDING AND 1 FOLLOWING, LAST_VALUE(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 10 PRECEDING AND 1 FOLLOWING, LAST_VALUE(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] ROWS BETWEEN 10 PRECEDING AND 1 FOLLOWING, LAG(annotated_data_finite.inc_col, Int64(1), Int64(1001)) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 1 PRECEDING AND 10 FOLLOWING, LAG(annotated_data_finite.inc_col, Int64(2), Int64(1002)) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] ROWS BETWEEN 10 PRECEDING AND 1 FOLLOWING, LEAD(annotated_data_finite.inc_col, Int64(-1), Int64(1001)) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 1 PRECEDING AND 10 FOLLOWING, LEAD(annotated_data_finite.inc_col, Int64(4), Int64(1004)) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] ROWS BETWEEN 10 PRECEDING AND 1 FOLLOWING]] -------------TableScan: annotated_data_finite projection=[ts, inc_col] +Limit: skip=0, fetch=5 +--Sort: annotated_data_finite.ts DESC NULLS FIRST, fetch=5 +----Projection: annotated_data_finite.ts, FIRST_VALUE(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 10 PRECEDING AND 1 FOLLOWING AS fv1, FIRST_VALUE(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] ROWS BETWEEN 10 PRECEDING AND 1 FOLLOWING AS fv2, LAST_VALUE(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 10 PRECEDING AND 1 FOLLOWING AS lv1, LAST_VALUE(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] ROWS BETWEEN 10 PRECEDING AND 1 FOLLOWING AS lv2, NTH_VALUE(annotated_data_finite.inc_col,Int64(5)) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 10 PRECEDING AND 1 FOLLOWING AS nv1, NTH_VALUE(annotated_data_finite.inc_col,Int64(5)) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] ROWS BETWEEN 10 PRECEDING AND 1 FOLLOWING AS nv2, ROW_NUMBER() ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 1 PRECEDING AND 10 FOLLOWING AS rn1, ROW_NUMBER() ORDER BY [annotated_data_finite.ts ASC NULLS LAST] ROWS BETWEEN 10 PRECEDING AND 1 FOLLOWING AS rn2, RANK() ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 1 PRECEDING AND 10 FOLLOWING AS rank1, RANK() ORDER BY [annotated_data_finite.ts ASC NULLS LAST] ROWS BETWEEN 10 PRECEDING AND 1 FOLLOWING AS rank2, DENSE_RANK() ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 1 PRECEDING AND 10 FOLLOWING AS dense_rank1, DENSE_RANK() ORDER BY [annotated_data_finite.ts ASC NULLS LAST] ROWS BETWEEN 10 PRECEDING AND 1 FOLLOWING AS dense_rank2, LAG(annotated_data_finite.inc_col,Int64(1),Int64(1001)) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 1 PRECEDING AND 10 FOLLOWING AS lag1, LAG(annotated_data_finite.inc_col,Int64(2),Int64(1002)) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] ROWS BETWEEN 10 PRECEDING AND 1 FOLLOWING AS lag2, LEAD(annotated_data_finite.inc_col,Int64(-1),Int64(1001)) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 1 PRECEDING AND 10 FOLLOWING AS lead1, LEAD(annotated_data_finite.inc_col,Int64(4),Int64(1004)) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] ROWS BETWEEN 10 PRECEDING AND 1 FOLLOWING AS lead2, FIRST_VALUE(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 10 PRECEDING AND 1 FOLLOWING AS fvr1, FIRST_VALUE(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] ROWS BETWEEN 10 PRECEDING AND 1 FOLLOWING AS fvr2, LAST_VALUE(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 10 PRECEDING AND 1 FOLLOWING AS lvr1, LAST_VALUE(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] ROWS BETWEEN 10 PRECEDING AND 1 FOLLOWING AS lvr2, LAG(annotated_data_finite.inc_col,Int64(1),Int64(1001)) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 1 PRECEDING AND 10 FOLLOWING AS lagr1, LAG(annotated_data_finite.inc_col,Int64(2),Int64(1002)) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] ROWS BETWEEN 10 PRECEDING AND 1 FOLLOWING AS lagr2, LEAD(annotated_data_finite.inc_col,Int64(-1),Int64(1001)) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 1 PRECEDING AND 10 FOLLOWING AS leadr1, LEAD(annotated_data_finite.inc_col,Int64(4),Int64(1004)) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] ROWS BETWEEN 10 PRECEDING AND 1 FOLLOWING AS leadr2 +------WindowAggr: windowExpr=[[FIRST_VALUE(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 10 PRECEDING AND 1 FOLLOWING, FIRST_VALUE(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] ROWS BETWEEN 10 PRECEDING AND 1 FOLLOWING, LAST_VALUE(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 10 PRECEDING AND 1 FOLLOWING, LAST_VALUE(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] ROWS BETWEEN 10 PRECEDING AND 1 FOLLOWING, NTH_VALUE(annotated_data_finite.inc_col, Int64(5)) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 10 PRECEDING AND 1 FOLLOWING, NTH_VALUE(annotated_data_finite.inc_col, Int64(5)) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] ROWS BETWEEN 10 PRECEDING AND 1 FOLLOWING, ROW_NUMBER() ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 1 PRECEDING AND 10 FOLLOWING, ROW_NUMBER() ORDER BY [annotated_data_finite.ts ASC NULLS LAST] ROWS BETWEEN 10 PRECEDING AND 1 FOLLOWING, RANK() ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 1 PRECEDING AND 10 FOLLOWING, RANK() ORDER BY [annotated_data_finite.ts ASC NULLS LAST] ROWS BETWEEN 10 PRECEDING AND 1 FOLLOWING, DENSE_RANK() ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 1 PRECEDING AND 10 FOLLOWING, DENSE_RANK() ORDER BY [annotated_data_finite.ts ASC NULLS LAST] ROWS BETWEEN 10 PRECEDING AND 1 FOLLOWING, LAG(annotated_data_finite.inc_col, Int64(1), Int64(1001)) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 1 PRECEDING AND 10 FOLLOWING, LAG(annotated_data_finite.inc_col, Int64(2), Int64(1002)) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] ROWS BETWEEN 10 PRECEDING AND 1 FOLLOWING, LEAD(annotated_data_finite.inc_col, Int64(-1), Int64(1001)) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 1 PRECEDING AND 10 FOLLOWING, LEAD(annotated_data_finite.inc_col, Int64(4), Int64(1004)) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] ROWS BETWEEN 10 PRECEDING AND 1 FOLLOWING]] +--------WindowAggr: windowExpr=[[FIRST_VALUE(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 10 PRECEDING AND 1 FOLLOWING, FIRST_VALUE(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] ROWS BETWEEN 10 PRECEDING AND 1 FOLLOWING, LAST_VALUE(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 10 PRECEDING AND 1 FOLLOWING, LAST_VALUE(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] ROWS BETWEEN 10 PRECEDING AND 1 FOLLOWING, LAG(annotated_data_finite.inc_col, Int64(1), Int64(1001)) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 1 PRECEDING AND 10 FOLLOWING, LAG(annotated_data_finite.inc_col, Int64(2), Int64(1002)) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] ROWS BETWEEN 10 PRECEDING AND 1 FOLLOWING, LEAD(annotated_data_finite.inc_col, Int64(-1), Int64(1001)) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 1 PRECEDING AND 10 FOLLOWING, LEAD(annotated_data_finite.inc_col, Int64(4), Int64(1004)) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] ROWS BETWEEN 10 PRECEDING AND 1 FOLLOWING]] +----------TableScan: annotated_data_finite projection=[ts, inc_col] physical_plan -ProjectionExec: expr=[fv1@0 as fv1, fv2@1 as fv2, lv1@2 as lv1, lv2@3 as lv2, nv1@4 as nv1, nv2@5 as nv2, rn1@6 as rn1, rn2@7 as rn2, rank1@8 as rank1, rank2@9 as rank2, dense_rank1@10 as dense_rank1, dense_rank2@11 as dense_rank2, lag1@12 as lag1, lag2@13 as lag2, lead1@14 as lead1, lead2@15 as lead2, fvr1@16 as fvr1, fvr2@17 as fvr2, lvr1@18 as lvr1, lvr2@19 as lvr2, lagr1@20 as lagr1, lagr2@21 as lagr2, leadr1@22 as leadr1, leadr2@23 as leadr2] ---GlobalLimitExec: skip=0, fetch=5 -----SortExec: fetch=5, expr=[ts@24 DESC] -------ProjectionExec: expr=[FIRST_VALUE(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 10 PRECEDING AND 1 FOLLOWING@10 as fv1, FIRST_VALUE(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] ROWS BETWEEN 10 PRECEDING AND 1 FOLLOWING@11 as fv2, LAST_VALUE(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 10 PRECEDING AND 1 FOLLOWING@12 as lv1, LAST_VALUE(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] ROWS BETWEEN 10 PRECEDING AND 1 FOLLOWING@13 as lv2, NTH_VALUE(annotated_data_finite.inc_col,Int64(5)) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 10 PRECEDING AND 1 FOLLOWING@14 as nv1, NTH_VALUE(annotated_data_finite.inc_col,Int64(5)) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] ROWS BETWEEN 10 PRECEDING AND 1 FOLLOWING@15 as nv2, ROW_NUMBER() ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 1 PRECEDING AND 10 FOLLOWING@16 as rn1, ROW_NUMBER() ORDER BY [annotated_data_finite.ts ASC NULLS LAST] ROWS BETWEEN 10 PRECEDING AND 1 FOLLOWING@17 as rn2, RANK() ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 1 PRECEDING AND 10 FOLLOWING@18 as rank1, RANK() ORDER BY [annotated_data_finite.ts ASC NULLS LAST] ROWS BETWEEN 10 PRECEDING AND 1 FOLLOWING@19 as rank2, DENSE_RANK() ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 1 PRECEDING AND 10 FOLLOWING@20 as dense_rank1, DENSE_RANK() ORDER BY [annotated_data_finite.ts ASC NULLS LAST] ROWS BETWEEN 10 PRECEDING AND 1 FOLLOWING@21 as dense_rank2, LAG(annotated_data_finite.inc_col,Int64(1),Int64(1001)) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 1 PRECEDING AND 10 FOLLOWING@22 as lag1, LAG(annotated_data_finite.inc_col,Int64(2),Int64(1002)) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] ROWS BETWEEN 10 PRECEDING AND 1 FOLLOWING@23 as lag2, LEAD(annotated_data_finite.inc_col,Int64(-1),Int64(1001)) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 1 PRECEDING AND 10 FOLLOWING@24 as lead1, LEAD(annotated_data_finite.inc_col,Int64(4),Int64(1004)) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] ROWS BETWEEN 10 PRECEDING AND 1 FOLLOWING@25 as lead2, FIRST_VALUE(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 10 PRECEDING AND 1 FOLLOWING@2 as fvr1, FIRST_VALUE(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] ROWS BETWEEN 10 PRECEDING AND 1 FOLLOWING@3 as fvr2, LAST_VALUE(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 10 PRECEDING AND 1 FOLLOWING@4 as lvr1, LAST_VALUE(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] ROWS BETWEEN 10 PRECEDING AND 1 FOLLOWING@5 as lvr2, LAG(annotated_data_finite.inc_col,Int64(1),Int64(1001)) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 1 PRECEDING AND 10 FOLLOWING@6 as lagr1, LAG(annotated_data_finite.inc_col,Int64(2),Int64(1002)) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] ROWS BETWEEN 10 PRECEDING AND 1 FOLLOWING@7 as lagr2, LEAD(annotated_data_finite.inc_col,Int64(-1),Int64(1001)) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 1 PRECEDING AND 10 FOLLOWING@8 as leadr1, LEAD(annotated_data_finite.inc_col,Int64(4),Int64(1004)) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] ROWS BETWEEN 10 PRECEDING AND 1 FOLLOWING@9 as leadr2, ts@0 as ts] ---------BoundedWindowAggExec: wdw=[FIRST_VALUE(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 10 PRECEDING AND 1 FOLLOWING: Ok(Field { name: "FIRST_VALUE(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 10 PRECEDING AND 1 FOLLOWING", data_type: Int32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(Int32(10)), end_bound: Following(Int32(1)) }, FIRST_VALUE(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] ROWS BETWEEN 10 PRECEDING AND 1 FOLLOWING: Ok(Field { name: "FIRST_VALUE(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] ROWS BETWEEN 10 PRECEDING AND 1 FOLLOWING", data_type: Int32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(10)), end_bound: Following(UInt64(1)) }, LAST_VALUE(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 10 PRECEDING AND 1 FOLLOWING: Ok(Field { name: "LAST_VALUE(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 10 PRECEDING AND 1 FOLLOWING", data_type: Int32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(Int32(10)), end_bound: Following(Int32(1)) }, LAST_VALUE(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] ROWS BETWEEN 10 PRECEDING AND 1 FOLLOWING: Ok(Field { name: "LAST_VALUE(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] ROWS BETWEEN 10 PRECEDING AND 1 FOLLOWING", data_type: Int32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(10)), end_bound: Following(UInt64(1)) }, NTH_VALUE(annotated_data_finite.inc_col,Int64(5)) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 10 PRECEDING AND 1 FOLLOWING: Ok(Field { name: "NTH_VALUE(annotated_data_finite.inc_col,Int64(5)) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 10 PRECEDING AND 1 FOLLOWING", data_type: Int32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(Int32(10)), end_bound: Following(Int32(1)) }, NTH_VALUE(annotated_data_finite.inc_col,Int64(5)) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] ROWS BETWEEN 10 PRECEDING AND 1 FOLLOWING: Ok(Field { name: "NTH_VALUE(annotated_data_finite.inc_col,Int64(5)) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] ROWS BETWEEN 10 PRECEDING AND 1 FOLLOWING", data_type: Int32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(10)), end_bound: Following(UInt64(1)) }, ROW_NUMBER() ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 1 PRECEDING AND 10 FOLLOWING: Ok(Field { name: "ROW_NUMBER() ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 1 PRECEDING AND 10 FOLLOWING", data_type: UInt64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(Int32(1)), end_bound: Following(Int32(10)) }, ROW_NUMBER() ORDER BY [annotated_data_finite.ts ASC NULLS LAST] ROWS BETWEEN 10 PRECEDING AND 1 FOLLOWING: Ok(Field { name: "ROW_NUMBER() ORDER BY [annotated_data_finite.ts ASC NULLS LAST] ROWS BETWEEN 10 PRECEDING AND 1 FOLLOWING", data_type: UInt64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(10)), end_bound: Following(UInt64(1)) }, RANK() ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 1 PRECEDING AND 10 FOLLOWING: Ok(Field { name: "RANK() ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 1 PRECEDING AND 10 FOLLOWING", data_type: UInt64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(Int32(1)), end_bound: Following(Int32(10)) }, RANK() ORDER BY [annotated_data_finite.ts ASC NULLS LAST] ROWS BETWEEN 10 PRECEDING AND 1 FOLLOWING: Ok(Field { name: "RANK() ORDER BY [annotated_data_finite.ts ASC NULLS LAST] ROWS BETWEEN 10 PRECEDING AND 1 FOLLOWING", data_type: UInt64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(10)), end_bound: Following(UInt64(1)) }, DENSE_RANK() ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 1 PRECEDING AND 10 FOLLOWING: Ok(Field { name: "DENSE_RANK() ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 1 PRECEDING AND 10 FOLLOWING", data_type: UInt64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(Int32(1)), end_bound: Following(Int32(10)) }, DENSE_RANK() ORDER BY [annotated_data_finite.ts ASC NULLS LAST] ROWS BETWEEN 10 PRECEDING AND 1 FOLLOWING: Ok(Field { name: "DENSE_RANK() ORDER BY [annotated_data_finite.ts ASC NULLS LAST] ROWS BETWEEN 10 PRECEDING AND 1 FOLLOWING", data_type: UInt64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(10)), end_bound: Following(UInt64(1)) }, LAG(annotated_data_finite.inc_col,Int64(1),Int64(1001)) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 1 PRECEDING AND 10 FOLLOWING: Ok(Field { name: "LAG(annotated_data_finite.inc_col,Int64(1),Int64(1001)) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 1 PRECEDING AND 10 FOLLOWING", data_type: Int32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(Int32(1)), end_bound: Following(Int32(10)) }, LAG(annotated_data_finite.inc_col,Int64(2),Int64(1002)) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] ROWS BETWEEN 10 PRECEDING AND 1 FOLLOWING: Ok(Field { name: "LAG(annotated_data_finite.inc_col,Int64(2),Int64(1002)) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] ROWS BETWEEN 10 PRECEDING AND 1 FOLLOWING", data_type: Int32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(10)), end_bound: Following(UInt64(1)) }, LEAD(annotated_data_finite.inc_col,Int64(-1),Int64(1001)) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 1 PRECEDING AND 10 FOLLOWING: Ok(Field { name: "LEAD(annotated_data_finite.inc_col,Int64(-1),Int64(1001)) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 1 PRECEDING AND 10 FOLLOWING", data_type: Int32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(Int32(1)), end_bound: Following(Int32(10)) }, LEAD(annotated_data_finite.inc_col,Int64(4),Int64(1004)) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] ROWS BETWEEN 10 PRECEDING AND 1 FOLLOWING: Ok(Field { name: "LEAD(annotated_data_finite.inc_col,Int64(4),Int64(1004)) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] ROWS BETWEEN 10 PRECEDING AND 1 FOLLOWING", data_type: Int32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(10)), end_bound: Following(UInt64(1)) }], mode=[Sorted] -----------BoundedWindowAggExec: wdw=[FIRST_VALUE(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 10 PRECEDING AND 1 FOLLOWING: Ok(Field { name: "FIRST_VALUE(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 10 PRECEDING AND 1 FOLLOWING", data_type: Int32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(Int32(1)), end_bound: Following(Int32(10)) }, FIRST_VALUE(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] ROWS BETWEEN 10 PRECEDING AND 1 FOLLOWING: Ok(Field { name: "FIRST_VALUE(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] ROWS BETWEEN 10 PRECEDING AND 1 FOLLOWING", data_type: Int32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(10)) }, LAST_VALUE(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 10 PRECEDING AND 1 FOLLOWING: Ok(Field { name: "LAST_VALUE(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 10 PRECEDING AND 1 FOLLOWING", data_type: Int32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(Int32(1)), end_bound: Following(Int32(10)) }, LAST_VALUE(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] ROWS BETWEEN 10 PRECEDING AND 1 FOLLOWING: Ok(Field { name: "LAST_VALUE(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] ROWS BETWEEN 10 PRECEDING AND 1 FOLLOWING", data_type: Int32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(10)) }, LAG(annotated_data_finite.inc_col,Int64(1),Int64(1001)) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 1 PRECEDING AND 10 FOLLOWING: Ok(Field { name: "LAG(annotated_data_finite.inc_col,Int64(1),Int64(1001)) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 1 PRECEDING AND 10 FOLLOWING", data_type: Int32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(Int32(10)), end_bound: Following(Int32(1)) }, LAG(annotated_data_finite.inc_col,Int64(2),Int64(1002)) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] ROWS BETWEEN 10 PRECEDING AND 1 FOLLOWING: Ok(Field { name: "LAG(annotated_data_finite.inc_col,Int64(2),Int64(1002)) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] ROWS BETWEEN 10 PRECEDING AND 1 FOLLOWING", data_type: Int32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(10)) }, LEAD(annotated_data_finite.inc_col,Int64(-1),Int64(1001)) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 1 PRECEDING AND 10 FOLLOWING: Ok(Field { name: "LEAD(annotated_data_finite.inc_col,Int64(-1),Int64(1001)) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 1 PRECEDING AND 10 FOLLOWING", data_type: Int32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(Int32(10)), end_bound: Following(Int32(1)) }, LEAD(annotated_data_finite.inc_col,Int64(4),Int64(1004)) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] ROWS BETWEEN 10 PRECEDING AND 1 FOLLOWING: Ok(Field { name: "LEAD(annotated_data_finite.inc_col,Int64(4),Int64(1004)) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] ROWS BETWEEN 10 PRECEDING AND 1 FOLLOWING", data_type: Int32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(10)) }], mode=[Sorted] -------------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_1.csv]]}, projection=[ts, inc_col], output_ordering=[ts@0 ASC NULLS LAST], has_header=true +GlobalLimitExec: skip=0, fetch=5 +--SortExec: fetch=5, expr=[ts@0 DESC] +----ProjectionExec: expr=[ts@0 as ts, FIRST_VALUE(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 10 PRECEDING AND 1 FOLLOWING@10 as fv1, FIRST_VALUE(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] ROWS BETWEEN 10 PRECEDING AND 1 FOLLOWING@11 as fv2, LAST_VALUE(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 10 PRECEDING AND 1 FOLLOWING@12 as lv1, LAST_VALUE(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] ROWS BETWEEN 10 PRECEDING AND 1 FOLLOWING@13 as lv2, NTH_VALUE(annotated_data_finite.inc_col,Int64(5)) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 10 PRECEDING AND 1 FOLLOWING@14 as nv1, NTH_VALUE(annotated_data_finite.inc_col,Int64(5)) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] ROWS BETWEEN 10 PRECEDING AND 1 FOLLOWING@15 as nv2, ROW_NUMBER() ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 1 PRECEDING AND 10 FOLLOWING@16 as rn1, ROW_NUMBER() ORDER BY [annotated_data_finite.ts ASC NULLS LAST] ROWS BETWEEN 10 PRECEDING AND 1 FOLLOWING@17 as rn2, RANK() ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 1 PRECEDING AND 10 FOLLOWING@18 as rank1, RANK() ORDER BY [annotated_data_finite.ts ASC NULLS LAST] ROWS BETWEEN 10 PRECEDING AND 1 FOLLOWING@19 as rank2, DENSE_RANK() ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 1 PRECEDING AND 10 FOLLOWING@20 as dense_rank1, DENSE_RANK() ORDER BY [annotated_data_finite.ts ASC NULLS LAST] ROWS BETWEEN 10 PRECEDING AND 1 FOLLOWING@21 as dense_rank2, LAG(annotated_data_finite.inc_col,Int64(1),Int64(1001)) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 1 PRECEDING AND 10 FOLLOWING@22 as lag1, LAG(annotated_data_finite.inc_col,Int64(2),Int64(1002)) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] ROWS BETWEEN 10 PRECEDING AND 1 FOLLOWING@23 as lag2, LEAD(annotated_data_finite.inc_col,Int64(-1),Int64(1001)) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 1 PRECEDING AND 10 FOLLOWING@24 as lead1, LEAD(annotated_data_finite.inc_col,Int64(4),Int64(1004)) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] ROWS BETWEEN 10 PRECEDING AND 1 FOLLOWING@25 as lead2, FIRST_VALUE(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 10 PRECEDING AND 1 FOLLOWING@2 as fvr1, FIRST_VALUE(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] ROWS BETWEEN 10 PRECEDING AND 1 FOLLOWING@3 as fvr2, LAST_VALUE(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 10 PRECEDING AND 1 FOLLOWING@4 as lvr1, LAST_VALUE(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] ROWS BETWEEN 10 PRECEDING AND 1 FOLLOWING@5 as lvr2, LAG(annotated_data_finite.inc_col,Int64(1),Int64(1001)) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 1 PRECEDING AND 10 FOLLOWING@6 as lagr1, LAG(annotated_data_finite.inc_col,Int64(2),Int64(1002)) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] ROWS BETWEEN 10 PRECEDING AND 1 FOLLOWING@7 as lagr2, LEAD(annotated_data_finite.inc_col,Int64(-1),Int64(1001)) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 1 PRECEDING AND 10 FOLLOWING@8 as leadr1, LEAD(annotated_data_finite.inc_col,Int64(4),Int64(1004)) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] ROWS BETWEEN 10 PRECEDING AND 1 FOLLOWING@9 as leadr2] +------BoundedWindowAggExec: wdw=[FIRST_VALUE(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 10 PRECEDING AND 1 FOLLOWING: Ok(Field { name: "FIRST_VALUE(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 10 PRECEDING AND 1 FOLLOWING", data_type: Int32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(Int32(10)), end_bound: Following(Int32(1)) }, FIRST_VALUE(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] ROWS BETWEEN 10 PRECEDING AND 1 FOLLOWING: Ok(Field { name: "FIRST_VALUE(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] ROWS BETWEEN 10 PRECEDING AND 1 FOLLOWING", data_type: Int32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(10)), end_bound: Following(UInt64(1)) }, LAST_VALUE(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 10 PRECEDING AND 1 FOLLOWING: Ok(Field { name: "LAST_VALUE(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 10 PRECEDING AND 1 FOLLOWING", data_type: Int32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(Int32(10)), end_bound: Following(Int32(1)) }, LAST_VALUE(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] ROWS BETWEEN 10 PRECEDING AND 1 FOLLOWING: Ok(Field { name: "LAST_VALUE(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] ROWS BETWEEN 10 PRECEDING AND 1 FOLLOWING", data_type: Int32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(10)), end_bound: Following(UInt64(1)) }, NTH_VALUE(annotated_data_finite.inc_col,Int64(5)) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 10 PRECEDING AND 1 FOLLOWING: Ok(Field { name: "NTH_VALUE(annotated_data_finite.inc_col,Int64(5)) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 10 PRECEDING AND 1 FOLLOWING", data_type: Int32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(Int32(10)), end_bound: Following(Int32(1)) }, NTH_VALUE(annotated_data_finite.inc_col,Int64(5)) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] ROWS BETWEEN 10 PRECEDING AND 1 FOLLOWING: Ok(Field { name: "NTH_VALUE(annotated_data_finite.inc_col,Int64(5)) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] ROWS BETWEEN 10 PRECEDING AND 1 FOLLOWING", data_type: Int32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(10)), end_bound: Following(UInt64(1)) }, ROW_NUMBER() ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 1 PRECEDING AND 10 FOLLOWING: Ok(Field { name: "ROW_NUMBER() ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 1 PRECEDING AND 10 FOLLOWING", data_type: UInt64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(Int32(1)), end_bound: Following(Int32(10)) }, ROW_NUMBER() ORDER BY [annotated_data_finite.ts ASC NULLS LAST] ROWS BETWEEN 10 PRECEDING AND 1 FOLLOWING: Ok(Field { name: "ROW_NUMBER() ORDER BY [annotated_data_finite.ts ASC NULLS LAST] ROWS BETWEEN 10 PRECEDING AND 1 FOLLOWING", data_type: UInt64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(10)), end_bound: Following(UInt64(1)) }, RANK() ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 1 PRECEDING AND 10 FOLLOWING: Ok(Field { name: "RANK() ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 1 PRECEDING AND 10 FOLLOWING", data_type: UInt64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(Int32(1)), end_bound: Following(Int32(10)) }, RANK() ORDER BY [annotated_data_finite.ts ASC NULLS LAST] ROWS BETWEEN 10 PRECEDING AND 1 FOLLOWING: Ok(Field { name: "RANK() ORDER BY [annotated_data_finite.ts ASC NULLS LAST] ROWS BETWEEN 10 PRECEDING AND 1 FOLLOWING", data_type: UInt64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(10)), end_bound: Following(UInt64(1)) }, DENSE_RANK() ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 1 PRECEDING AND 10 FOLLOWING: Ok(Field { name: "DENSE_RANK() ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 1 PRECEDING AND 10 FOLLOWING", data_type: UInt64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(Int32(1)), end_bound: Following(Int32(10)) }, DENSE_RANK() ORDER BY [annotated_data_finite.ts ASC NULLS LAST] ROWS BETWEEN 10 PRECEDING AND 1 FOLLOWING: Ok(Field { name: "DENSE_RANK() ORDER BY [annotated_data_finite.ts ASC NULLS LAST] ROWS BETWEEN 10 PRECEDING AND 1 FOLLOWING", data_type: UInt64, nullable: false, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(10)), end_bound: Following(UInt64(1)) }, LAG(annotated_data_finite.inc_col,Int64(1),Int64(1001)) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 1 PRECEDING AND 10 FOLLOWING: Ok(Field { name: "LAG(annotated_data_finite.inc_col,Int64(1),Int64(1001)) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 1 PRECEDING AND 10 FOLLOWING", data_type: Int32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(Int32(1)), end_bound: Following(Int32(10)) }, LAG(annotated_data_finite.inc_col,Int64(2),Int64(1002)) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] ROWS BETWEEN 10 PRECEDING AND 1 FOLLOWING: Ok(Field { name: "LAG(annotated_data_finite.inc_col,Int64(2),Int64(1002)) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] ROWS BETWEEN 10 PRECEDING AND 1 FOLLOWING", data_type: Int32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(10)), end_bound: Following(UInt64(1)) }, LEAD(annotated_data_finite.inc_col,Int64(-1),Int64(1001)) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 1 PRECEDING AND 10 FOLLOWING: Ok(Field { name: "LEAD(annotated_data_finite.inc_col,Int64(-1),Int64(1001)) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] RANGE BETWEEN 1 PRECEDING AND 10 FOLLOWING", data_type: Int32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(Int32(1)), end_bound: Following(Int32(10)) }, LEAD(annotated_data_finite.inc_col,Int64(4),Int64(1004)) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] ROWS BETWEEN 10 PRECEDING AND 1 FOLLOWING: Ok(Field { name: "LEAD(annotated_data_finite.inc_col,Int64(4),Int64(1004)) ORDER BY [annotated_data_finite.ts ASC NULLS LAST] ROWS BETWEEN 10 PRECEDING AND 1 FOLLOWING", data_type: Int32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(10)), end_bound: Following(UInt64(1)) }], mode=[Sorted] +--------BoundedWindowAggExec: wdw=[FIRST_VALUE(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 10 PRECEDING AND 1 FOLLOWING: Ok(Field { name: "FIRST_VALUE(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 10 PRECEDING AND 1 FOLLOWING", data_type: Int32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(Int32(1)), end_bound: Following(Int32(10)) }, FIRST_VALUE(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] ROWS BETWEEN 10 PRECEDING AND 1 FOLLOWING: Ok(Field { name: "FIRST_VALUE(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] ROWS BETWEEN 10 PRECEDING AND 1 FOLLOWING", data_type: Int32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(10)) }, LAST_VALUE(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 10 PRECEDING AND 1 FOLLOWING: Ok(Field { name: "LAST_VALUE(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 10 PRECEDING AND 1 FOLLOWING", data_type: Int32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(Int32(1)), end_bound: Following(Int32(10)) }, LAST_VALUE(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] ROWS BETWEEN 10 PRECEDING AND 1 FOLLOWING: Ok(Field { name: "LAST_VALUE(annotated_data_finite.inc_col) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] ROWS BETWEEN 10 PRECEDING AND 1 FOLLOWING", data_type: Int32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(10)) }, LAG(annotated_data_finite.inc_col,Int64(1),Int64(1001)) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 1 PRECEDING AND 10 FOLLOWING: Ok(Field { name: "LAG(annotated_data_finite.inc_col,Int64(1),Int64(1001)) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 1 PRECEDING AND 10 FOLLOWING", data_type: Int32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(Int32(10)), end_bound: Following(Int32(1)) }, LAG(annotated_data_finite.inc_col,Int64(2),Int64(1002)) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] ROWS BETWEEN 10 PRECEDING AND 1 FOLLOWING: Ok(Field { name: "LAG(annotated_data_finite.inc_col,Int64(2),Int64(1002)) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] ROWS BETWEEN 10 PRECEDING AND 1 FOLLOWING", data_type: Int32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(10)) }, LEAD(annotated_data_finite.inc_col,Int64(-1),Int64(1001)) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 1 PRECEDING AND 10 FOLLOWING: Ok(Field { name: "LEAD(annotated_data_finite.inc_col,Int64(-1),Int64(1001)) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] RANGE BETWEEN 1 PRECEDING AND 10 FOLLOWING", data_type: Int32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(Int32(10)), end_bound: Following(Int32(1)) }, LEAD(annotated_data_finite.inc_col,Int64(4),Int64(1004)) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] ROWS BETWEEN 10 PRECEDING AND 1 FOLLOWING: Ok(Field { name: "LEAD(annotated_data_finite.inc_col,Int64(4),Int64(1004)) ORDER BY [annotated_data_finite.ts DESC NULLS FIRST] ROWS BETWEEN 10 PRECEDING AND 1 FOLLOWING", data_type: Int32, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(1)), end_bound: Following(UInt64(10)) }], mode=[Sorted] +----------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/datafusion/core/tests/data/window_1.csv]]}, projection=[ts, inc_col], output_ordering=[ts@0 ASC NULLS LAST], has_header=true -query IIIIIIIIIIIIIIIIIIIIIIII +query IIIIIIIIIIIIIIIIIIIIIIIII SELECT + ts, FIRST_VALUE(inc_col) OVER(ORDER BY ts RANGE BETWEEN 10 PRECEDING and 1 FOLLOWING) as fv1, FIRST_VALUE(inc_col) OVER(ORDER BY ts ROWS BETWEEN 10 PRECEDING and 1 FOLLOWING) as fv2, LAST_VALUE(inc_col) OVER(ORDER BY ts RANGE BETWEEN 10 PRECEDING and 1 FOLLOWING) as lv1, @@ -2673,14 +2673,14 @@ SELECT LEAD(inc_col, -1, 1001) OVER(ORDER BY ts DESC RANGE BETWEEN 1 PRECEDING and 10 FOLLOWING) AS leadr1, LEAD(inc_col, 4, 1004) OVER(ORDER BY ts DESC ROWS BETWEEN 10 PRECEDING and 1 FOLLOWING) as leadr2 FROM annotated_data_finite - ORDER BY ts DESC + ORDER BY ts DESC, fv2 LIMIT 5; ---- -289 269 305 305 305 283 100 100 99 99 86 86 301 296 301 1004 305 305 301 301 1001 1002 1001 289 -289 266 305 305 305 278 99 99 99 99 86 86 296 291 296 1004 305 305 301 296 305 1002 305 286 -289 261 296 301 NULL 275 98 98 98 98 85 85 291 289 291 1004 305 305 296 291 301 305 301 283 -286 259 291 296 NULL 272 97 97 97 97 84 84 289 286 289 1004 305 305 291 289 296 301 296 278 -275 254 289 291 289 269 96 96 96 96 83 83 286 283 286 305 305 305 289 286 291 296 291 275 +264 289 266 305 305 305 278 99 99 99 99 86 86 296 291 296 1004 305 305 301 296 305 1002 305 286 +264 289 269 305 305 305 283 100 100 99 99 86 86 301 296 301 1004 305 305 301 301 1001 1002 1001 289 +262 289 261 296 301 NULL 275 98 98 98 98 85 85 291 289 291 1004 305 305 296 291 301 305 301 283 +258 286 259 291 296 NULL 272 97 97 97 97 84 84 289 286 289 1004 305 305 291 289 296 301 296 278 +254 275 254 289 291 289 269 96 96 96 96 83 83 286 283 286 305 305 305 289 286 291 296 291 275 # test_source_sorted_unbounded_preceding @@ -3271,25 +3271,27 @@ drop table annotated_data_infinite2 # window3 spec is not used in window functions. # The query should still work. -query RR +query IRR SELECT - MAX(c12) OVER window1, - MIN(c12) OVER window2 as max1 + C3, + MAX(c12) OVER window1 as max1, + MIN(c12) OVER window2 as max2 FROM aggregate_test_100 WINDOW window1 AS (ORDER BY C12), window2 AS (PARTITION BY C11), window3 AS (ORDER BY C1) - ORDER BY C3 + ORDER BY C3, max2 LIMIT 5 ---- -0.970671228336 0.970671228336 -0.850672105305 0.850672105305 -0.152498292972 0.152498292972 -0.369363046006 0.369363046006 -0.56535284223 0.56535284223 +-117 0.850672105305 0.850672105305 +-117 0.970671228336 0.970671228336 +-111 0.152498292972 0.152498292972 +-107 0.369363046006 0.369363046006 +-106 0.56535284223 0.56535284223 query TT EXPLAIN SELECT + C3, MAX(c12) OVER window1 as min1, MIN(c12) OVER window2 as max1 FROM aggregate_test_100 @@ -3300,42 +3302,41 @@ EXPLAIN SELECT LIMIT 5 ---- logical_plan -Projection: min1, max1 ---Limit: skip=0, fetch=5 -----Sort: aggregate_test_100.c3 ASC NULLS LAST, fetch=5 -------Projection: MAX(aggregate_test_100.c12) ORDER BY [aggregate_test_100.c12 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW AS min1, MIN(aggregate_test_100.c12) PARTITION BY [aggregate_test_100.c11] ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING AS max1, aggregate_test_100.c3 ---------WindowAggr: windowExpr=[[MAX(aggregate_test_100.c12) ORDER BY [aggregate_test_100.c12 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]] -----------Projection: aggregate_test_100.c3, aggregate_test_100.c12, MIN(aggregate_test_100.c12) PARTITION BY [aggregate_test_100.c11] ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING -------------WindowAggr: windowExpr=[[MIN(aggregate_test_100.c12) PARTITION BY [aggregate_test_100.c11] ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING]] ---------------TableScan: aggregate_test_100 projection=[c3, c11, c12] +Limit: skip=0, fetch=5 +--Sort: aggregate_test_100.c3 ASC NULLS LAST, fetch=5 +----Projection: aggregate_test_100.c3, MAX(aggregate_test_100.c12) ORDER BY [aggregate_test_100.c12 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW AS min1, MIN(aggregate_test_100.c12) PARTITION BY [aggregate_test_100.c11] ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING AS max1 +------WindowAggr: windowExpr=[[MAX(aggregate_test_100.c12) ORDER BY [aggregate_test_100.c12 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW]] +--------Projection: aggregate_test_100.c3, aggregate_test_100.c12, MIN(aggregate_test_100.c12) PARTITION BY [aggregate_test_100.c11] ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING +----------WindowAggr: windowExpr=[[MIN(aggregate_test_100.c12) PARTITION BY [aggregate_test_100.c11] ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING]] +------------TableScan: aggregate_test_100 projection=[c3, c11, c12] physical_plan -ProjectionExec: expr=[min1@0 as min1, max1@1 as max1] ---GlobalLimitExec: skip=0, fetch=5 -----SortExec: fetch=5, expr=[c3@2 ASC NULLS LAST] -------ProjectionExec: expr=[MAX(aggregate_test_100.c12) ORDER BY [aggregate_test_100.c12 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@3 as min1, MIN(aggregate_test_100.c12) PARTITION BY [aggregate_test_100.c11] ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING@2 as max1, c3@0 as c3] ---------BoundedWindowAggExec: wdw=[MAX(aggregate_test_100.c12) ORDER BY [aggregate_test_100.c12 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW: Ok(Field { name: "MAX(aggregate_test_100.c12) ORDER BY [aggregate_test_100.c12 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW", data_type: Float64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(Float64(NULL)), end_bound: CurrentRow }], mode=[Sorted] -----------SortExec: expr=[c12@1 ASC NULLS LAST] -------------ProjectionExec: expr=[c3@0 as c3, c12@2 as c12, MIN(aggregate_test_100.c12) PARTITION BY [aggregate_test_100.c11] ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING@3 as MIN(aggregate_test_100.c12) PARTITION BY [aggregate_test_100.c11] ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING] ---------------WindowAggExec: wdw=[MIN(aggregate_test_100.c12) PARTITION BY [aggregate_test_100.c11] ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING: Ok(Field { name: "MIN(aggregate_test_100.c12) PARTITION BY [aggregate_test_100.c11] ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING", data_type: Float64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(NULL)), end_bound: Following(UInt64(NULL)) }] -----------------SortExec: expr=[c11@1 ASC NULLS LAST] -------------------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c3, c11, c12], has_header=true +GlobalLimitExec: skip=0, fetch=5 +--SortExec: fetch=5, expr=[c3@0 ASC NULLS LAST] +----ProjectionExec: expr=[c3@0 as c3, MAX(aggregate_test_100.c12) ORDER BY [aggregate_test_100.c12 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW@3 as min1, MIN(aggregate_test_100.c12) PARTITION BY [aggregate_test_100.c11] ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING@2 as max1] +------BoundedWindowAggExec: wdw=[MAX(aggregate_test_100.c12) ORDER BY [aggregate_test_100.c12 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW: Ok(Field { name: "MAX(aggregate_test_100.c12) ORDER BY [aggregate_test_100.c12 ASC NULLS LAST] RANGE BETWEEN UNBOUNDED PRECEDING AND CURRENT ROW", data_type: Float64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Range, start_bound: Preceding(Float64(NULL)), end_bound: CurrentRow }], mode=[Sorted] +--------SortExec: expr=[c12@1 ASC NULLS LAST] +----------ProjectionExec: expr=[c3@0 as c3, c12@2 as c12, MIN(aggregate_test_100.c12) PARTITION BY [aggregate_test_100.c11] ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING@3 as MIN(aggregate_test_100.c12) PARTITION BY [aggregate_test_100.c11] ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING] +------------WindowAggExec: wdw=[MIN(aggregate_test_100.c12) PARTITION BY [aggregate_test_100.c11] ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING: Ok(Field { name: "MIN(aggregate_test_100.c12) PARTITION BY [aggregate_test_100.c11] ROWS BETWEEN UNBOUNDED PRECEDING AND UNBOUNDED FOLLOWING", data_type: Float64, nullable: true, dict_id: 0, dict_is_ordered: false, metadata: {} }), frame: WindowFrame { units: Rows, start_bound: Preceding(UInt64(NULL)), end_bound: Following(UInt64(NULL)) }] +--------------SortExec: expr=[c11@1 ASC NULLS LAST] +----------------CsvExec: file_groups={1 group: [[WORKSPACE_ROOT/testing/data/csv/aggregate_test_100.csv]]}, projection=[c3, c11, c12], has_header=true # window1 spec is used multiple times under different aggregations. # The query should still work. -query RR +query IRR SELECT + C3, MAX(c12) OVER window1 as min1, MIN(c12) OVER window1 as max1 FROM aggregate_test_100 WINDOW window1 AS (ORDER BY C12) - ORDER BY C3 + ORDER BY C3, min1 LIMIT 5 ---- -0.970671228336 0.014793053078 -0.850672105305 0.014793053078 -0.152498292972 0.014793053078 -0.369363046006 0.014793053078 -0.56535284223 0.014793053078 +-117 0.850672105305 0.014793053078 +-117 0.970671228336 0.014793053078 +-111 0.152498292972 0.014793053078 +-107 0.369363046006 0.014793053078 +-106 0.56535284223 0.014793053078 query TT EXPLAIN SELECT