diff --git a/src/mito2/src/read/scan_region.rs b/src/mito2/src/read/scan_region.rs index d0bef554727f..b13068dbec5a 100644 --- a/src/mito2/src/read/scan_region.rs +++ b/src/mito2/src/read/scan_region.rs @@ -25,7 +25,7 @@ use common_time::range::TimestampRange; use common_time::Timestamp; use datafusion::physical_plan::DisplayFormatType; use smallvec::SmallVec; -use store_api::region_engine::RegionScannerRef; +use store_api::region_engine::{PartitionRange, RegionScannerRef}; use store_api::storage::{ScanRequest, TimeSeriesRowSelector}; use table::predicate::{build_time_range_predicate, Predicate}; use tokio::sync::{mpsc, Mutex, Semaphore}; @@ -705,6 +705,37 @@ impl ScanInput { let rows_in_memtables: usize = self.memtables.iter().map(|m| m.stats().num_rows()).sum(); rows_in_files + rows_in_memtables } + + /// Retrieves [`PartitionRange`] from memtable and files + pub(crate) fn partition_ranges(&self) -> Vec { + let mut id = 0; + let mut container = Vec::with_capacity(self.memtables.len() + self.files.len()); + + for memtable in &self.memtables { + let range = PartitionRange { + // TODO(ruihang): filter out empty memtables in the future. + start: memtable.stats().time_range().unwrap().0, + end: memtable.stats().time_range().unwrap().1, + num_rows: memtable.stats().num_rows(), + identifier: id, + }; + id += 1; + container.push(range); + } + + for file in &self.files { + let range = PartitionRange { + start: file.meta_ref().time_range.0, + end: file.meta_ref().time_range.1, + num_rows: file.meta_ref().num_rows as usize, + identifier: id, + }; + id += 1; + container.push(range); + } + + container + } } #[cfg(test)] diff --git a/src/mito2/src/read/seq_scan.rs b/src/mito2/src/read/seq_scan.rs index 954580986ec2..80fbb3189f43 100644 --- a/src/mito2/src/read/seq_scan.rs +++ b/src/mito2/src/read/seq_scan.rs @@ -67,10 +67,11 @@ impl SeqScan { /// Creates a new [SeqScan]. pub(crate) fn new(input: ScanInput) -> Self { let parallelism = input.parallelism.parallelism.max(1); - let properties = ScannerProperties::default() + let mut properties = ScannerProperties::default() .with_parallelism(parallelism) .with_append_mode(input.append_mode) .with_total_rows(input.total_rows()); + properties.partitions = vec![input.partition_ranges()]; let stream_ctx = Arc::new(StreamContext::new(input)); Self { diff --git a/src/mito2/src/test_util.rs b/src/mito2/src/test_util.rs index 8985baae7064..da0451bc1aa8 100644 --- a/src/mito2/src/test_util.rs +++ b/src/mito2/src/test_util.rs @@ -364,7 +364,7 @@ impl TestEnv { .as_path() .display() .to_string(); - let mut builder = Fs::default(); + let builder = Fs::default(); let object_store = ObjectStore::new(builder.root(&data_path)).unwrap().finish(); object_store_manager.add(storage_name, object_store); } diff --git a/src/query/src/optimizer/parallelize_scan.rs b/src/query/src/optimizer/parallelize_scan.rs index cd58e9d6c60d..d6797027966a 100644 --- a/src/query/src/optimizer/parallelize_scan.rs +++ b/src/query/src/optimizer/parallelize_scan.rs @@ -61,7 +61,6 @@ impl ParallelizeScan { debug!( "Assign {total_range_num} ranges to {expected_partition_num} partitions" ); - // update the partition ranges let new_exec = region_scan_exec .with_new_partitions(partition_ranges) @@ -114,25 +113,25 @@ mod test { PartitionRange { start: Timestamp::new(0, TimeUnit::Second), end: Timestamp::new(10, TimeUnit::Second), - estimated_size: 100, + num_rows: 100, identifier: 1, }, PartitionRange { start: Timestamp::new(10, TimeUnit::Second), end: Timestamp::new(20, TimeUnit::Second), - estimated_size: 200, + num_rows: 200, identifier: 2, }, PartitionRange { start: Timestamp::new(20, TimeUnit::Second), end: Timestamp::new(30, TimeUnit::Second), - estimated_size: 150, + num_rows: 150, identifier: 3, }, PartitionRange { start: Timestamp::new(30, TimeUnit::Second), end: Timestamp::new(40, TimeUnit::Second), - estimated_size: 250, + num_rows: 250, identifier: 4, }, ]; @@ -146,13 +145,13 @@ mod test { PartitionRange { start: Timestamp::new(0, TimeUnit::Second), end: Timestamp::new(10, TimeUnit::Second), - estimated_size: 100, + num_rows: 100, identifier: 1, }, PartitionRange { start: Timestamp::new(20, TimeUnit::Second), end: Timestamp::new(30, TimeUnit::Second), - estimated_size: 150, + num_rows: 150, identifier: 3, }, ], @@ -160,13 +159,13 @@ mod test { PartitionRange { start: Timestamp::new(10, TimeUnit::Second), end: Timestamp::new(20, TimeUnit::Second), - estimated_size: 200, + num_rows: 200, identifier: 2, }, PartitionRange { start: Timestamp::new(30, TimeUnit::Second), end: Timestamp::new(40, TimeUnit::Second), - estimated_size: 250, + num_rows: 250, identifier: 4, }, ], @@ -180,25 +179,25 @@ mod test { vec![PartitionRange { start: Timestamp::new(0, TimeUnit::Second), end: Timestamp::new(10, TimeUnit::Second), - estimated_size: 100, + num_rows: 100, identifier: 1, }], vec![PartitionRange { start: Timestamp::new(10, TimeUnit::Second), end: Timestamp::new(20, TimeUnit::Second), - estimated_size: 200, + num_rows: 200, identifier: 2, }], vec![PartitionRange { start: Timestamp::new(20, TimeUnit::Second), end: Timestamp::new(30, TimeUnit::Second), - estimated_size: 150, + num_rows: 150, identifier: 3, }], vec![PartitionRange { start: Timestamp::new(30, TimeUnit::Second), end: Timestamp::new(40, TimeUnit::Second), - estimated_size: 250, + num_rows: 250, identifier: 4, }], ]; diff --git a/src/store-api/src/region_engine.rs b/src/store-api/src/region_engine.rs index 856396ca6e6e..cf37fe82f9e6 100644 --- a/src/store-api/src/region_engine.rs +++ b/src/store-api/src/region_engine.rs @@ -149,9 +149,8 @@ pub struct PartitionRange { pub start: Timestamp, /// End time of time index column. Inclusive. pub end: Timestamp, - /// Estimate size of this range. Is used to balance ranges between partitions. - /// No base unit, just a number. - pub estimated_size: usize, + /// Number of rows in this range. Is used to balance ranges between partitions. + pub num_rows: usize, /// Identifier to this range. Assigned by storage engine. pub identifier: usize, }