From ed906df6c2b8a8bf7b159acfbd6f7eae0a84ca32 Mon Sep 17 00:00:00 2001 From: Ruihang Xia Date: Tue, 27 Aug 2024 19:23:40 +0800 Subject: [PATCH] fix: update properties on updating partitions Signed-off-by: Ruihang Xia --- src/query/src/optimizer/parallelize_scan.rs | 5 ++-- src/table/src/table/scan.rs | 29 +++++++++++++++++---- 2 files changed, 27 insertions(+), 7 deletions(-) diff --git a/src/query/src/optimizer/parallelize_scan.rs b/src/query/src/optimizer/parallelize_scan.rs index fa20ca6d3843..f8523e2cf11b 100644 --- a/src/query/src/optimizer/parallelize_scan.rs +++ b/src/query/src/optimizer/parallelize_scan.rs @@ -63,9 +63,10 @@ impl ParallelizeScan { ); // update the partition ranges - region_scan_exec - .set_partitions(partition_ranges) + let new_exec = region_scan_exec + .with_new_partitions(partition_ranges) .map_err(|e| DataFusionError::External(e.into_inner()))?; + return Ok(Transformed::yes(Arc::new(new_exec))); } // The plan might be modified, but it's modified in-place so we always return diff --git a/src/table/src/table/scan.rs b/src/table/src/table/scan.rs index 83c348d6d92b..19283058c6e1 100644 --- a/src/table/src/table/scan.rs +++ b/src/table/src/table/scan.rs @@ -41,7 +41,7 @@ use crate::table::metrics::StreamMetrics; /// A plan to read multiple partitions from a region of a table. #[derive(Debug)] pub struct RegionScanExec { - scanner: Mutex, + scanner: Arc>, arrow_schema: ArrowSchemaRef, /// The expected output ordering for the plan. output_ordering: Option>, @@ -70,7 +70,7 @@ impl RegionScanExec { let append_mode = scanner_props.append_mode(); let total_rows = scanner_props.total_rows(); Self { - scanner: Mutex::new(scanner), + scanner: Arc::new(Mutex::new(scanner)), arrow_schema, output_ordering: None, metric: ExecutionPlanMetricsSet::new(), @@ -102,9 +102,28 @@ impl RegionScanExec { } /// Update the partition ranges of underlying scanner. - pub fn set_partitions(&self, partitions: Vec>) -> Result<(), BoxedError> { - let mut scanner = self.scanner.lock().unwrap(); - scanner.prepare(partitions) + pub fn with_new_partitions( + &self, + partitions: Vec>, + ) -> Result { + let num_partitions = partitions.len(); + let mut properties = self.properties.clone(); + properties.partitioning = Partitioning::UnknownPartitioning(num_partitions); + + { + let mut scanner = self.scanner.lock().unwrap(); + scanner.prepare(partitions)?; + } + + Ok(Self { + scanner: self.scanner.clone(), + arrow_schema: self.arrow_schema.clone(), + output_ordering: self.output_ordering.clone(), + metric: self.metric.clone(), + properties, + append_mode: self.append_mode, + total_rows: self.total_rows, + }) } }