diff --git a/src/query/src/optimizer/parallelize_scan.rs b/src/query/src/optimizer/parallelize_scan.rs index fa20ca6d3843..cd58e9d6c60d 100644 --- a/src/query/src/optimizer/parallelize_scan.rs +++ b/src/query/src/optimizer/parallelize_scan.rs @@ -63,9 +63,10 @@ impl ParallelizeScan { ); // update the partition ranges - region_scan_exec - .set_partitions(partition_ranges) + let new_exec = region_scan_exec + .with_new_partitions(partition_ranges) .map_err(|e| DataFusionError::External(e.into_inner()))?; + return Ok(Transformed::yes(Arc::new(new_exec))); } // The plan might be modified, but it's modified in-place so we always return @@ -80,11 +81,15 @@ impl ParallelizeScan { /// Distribute [`PartitionRange`]s to each partition. /// /// Currently we use a simple round-robin strategy to assign ranges to partitions. + /// This method may return partitions with smaller number than `expected_partition_num` + /// if the number of ranges is smaller than `expected_partition_num`. But this will + /// return at least one partition. fn assign_partition_range( ranges: Vec, expected_partition_num: usize, ) -> Vec> { - let mut partition_ranges = vec![vec![]; expected_partition_num]; + let actual_partition_num = expected_partition_num.min(ranges.len()).max(1); + let mut partition_ranges = vec![vec![]; actual_partition_num]; // round-robin assignment for (i, range) in ranges.into_iter().enumerate() { @@ -95,3 +100,112 @@ impl ParallelizeScan { partition_ranges } } + +#[cfg(test)] +mod test { + use common_time::timestamp::TimeUnit; + use common_time::Timestamp; + + use super::*; + + #[test] + fn test_assign_partition_range() { + let ranges = vec![ + PartitionRange { + start: Timestamp::new(0, TimeUnit::Second), + end: Timestamp::new(10, TimeUnit::Second), + estimated_size: 100, + identifier: 1, + }, + PartitionRange { + start: Timestamp::new(10, TimeUnit::Second), + end: Timestamp::new(20, TimeUnit::Second), + estimated_size: 200, + identifier: 2, + }, + PartitionRange { + start: Timestamp::new(20, TimeUnit::Second), + end: Timestamp::new(30, TimeUnit::Second), + estimated_size: 150, + identifier: 3, + }, + PartitionRange { + start: Timestamp::new(30, TimeUnit::Second), + end: Timestamp::new(40, TimeUnit::Second), + estimated_size: 250, + identifier: 4, + }, + ]; + + // assign to 2 partitions + let expected_partition_num = 2; + let result = + ParallelizeScan::assign_partition_range(ranges.clone(), expected_partition_num); + let expected = vec![ + vec![ + PartitionRange { + start: Timestamp::new(0, TimeUnit::Second), + end: Timestamp::new(10, TimeUnit::Second), + estimated_size: 100, + identifier: 1, + }, + PartitionRange { + start: Timestamp::new(20, TimeUnit::Second), + end: Timestamp::new(30, TimeUnit::Second), + estimated_size: 150, + identifier: 3, + }, + ], + vec![ + PartitionRange { + start: Timestamp::new(10, TimeUnit::Second), + end: Timestamp::new(20, TimeUnit::Second), + estimated_size: 200, + identifier: 2, + }, + PartitionRange { + start: Timestamp::new(30, TimeUnit::Second), + end: Timestamp::new(40, TimeUnit::Second), + estimated_size: 250, + identifier: 4, + }, + ], + ]; + assert_eq!(result, expected); + + // assign 4 ranges to 5 partitions. Only 4 partitions are returned. + let expected_partition_num = 5; + let result = ParallelizeScan::assign_partition_range(ranges, expected_partition_num); + let expected = vec![ + vec![PartitionRange { + start: Timestamp::new(0, TimeUnit::Second), + end: Timestamp::new(10, TimeUnit::Second), + estimated_size: 100, + identifier: 1, + }], + vec![PartitionRange { + start: Timestamp::new(10, TimeUnit::Second), + end: Timestamp::new(20, TimeUnit::Second), + estimated_size: 200, + identifier: 2, + }], + vec![PartitionRange { + start: Timestamp::new(20, TimeUnit::Second), + end: Timestamp::new(30, TimeUnit::Second), + estimated_size: 150, + identifier: 3, + }], + vec![PartitionRange { + start: Timestamp::new(30, TimeUnit::Second), + end: Timestamp::new(40, TimeUnit::Second), + estimated_size: 250, + identifier: 4, + }], + ]; + assert_eq!(result, expected); + + // assign 0 ranges to 5 partitions. Only 1 partition is returned. + let result = ParallelizeScan::assign_partition_range(vec![], 5); + assert_eq!(result.len(), 1); + } +} diff --git a/src/store-api/src/region_engine.rs b/src/store-api/src/region_engine.rs index ffefc1febdcd..856396ca6e6e 100644 --- a/src/store-api/src/region_engine.rs +++ b/src/store-api/src/region_engine.rs @@ -143,7 +143,7 @@ impl ScannerPartitioning { } /// Represents one data range within a partition -#[derive(Debug, Clone, Copy)] +#[derive(Debug, Clone, Copy, PartialEq, Eq)] pub struct PartitionRange { /// Start time of time index column. Inclusive. pub start: Timestamp, diff --git a/src/table/src/table/scan.rs b/src/table/src/table/scan.rs index 83c348d6d92b..19283058c6e1 100644 --- a/src/table/src/table/scan.rs +++ b/src/table/src/table/scan.rs @@ -41,7 +41,7 @@ use crate::table::metrics::StreamMetrics; /// A plan to read multiple partitions from a region of a table. #[derive(Debug)] pub struct RegionScanExec { - scanner: Mutex, + scanner: Arc>, arrow_schema: ArrowSchemaRef, /// The expected output ordering for the plan. output_ordering: Option>, @@ -70,7 +70,7 @@ impl RegionScanExec { let append_mode = scanner_props.append_mode(); let total_rows = scanner_props.total_rows(); Self { - scanner: Mutex::new(scanner), + scanner: Arc::new(Mutex::new(scanner)), arrow_schema, output_ordering: None, metric: ExecutionPlanMetricsSet::new(), @@ -102,9 +102,28 @@ impl RegionScanExec { } /// Update the partition ranges of underlying scanner. - pub fn set_partitions(&self, partitions: Vec>) -> Result<(), BoxedError> { - let mut scanner = self.scanner.lock().unwrap(); - scanner.prepare(partitions) + pub fn with_new_partitions( + &self, + partitions: Vec>, + ) -> Result { + let num_partitions = partitions.len(); + let mut properties = self.properties.clone(); + properties.partitioning = Partitioning::UnknownPartitioning(num_partitions); + + { + let mut scanner = self.scanner.lock().unwrap(); + scanner.prepare(partitions)?; + } + + Ok(Self { + scanner: self.scanner.clone(), + arrow_schema: self.arrow_schema.clone(), + output_ordering: self.output_ordering.clone(), + metric: self.metric.clone(), + properties, + append_mode: self.append_mode, + total_rows: self.total_rows, + }) } }