Skip to content

Commit

Permalink
fix: update properties on updating partitions
Browse files Browse the repository at this point in the history
Signed-off-by: Ruihang Xia <[email protected]>
  • Loading branch information
waynexia committed Aug 27, 2024
1 parent 0b102ef commit ed906df
Show file tree
Hide file tree
Showing 2 changed files with 27 additions and 7 deletions.
5 changes: 3 additions & 2 deletions src/query/src/optimizer/parallelize_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,9 +63,10 @@ impl ParallelizeScan {
);

// update the partition ranges
region_scan_exec
.set_partitions(partition_ranges)
let new_exec = region_scan_exec
.with_new_partitions(partition_ranges)
.map_err(|e| DataFusionError::External(e.into_inner()))?;
return Ok(Transformed::yes(Arc::new(new_exec)));
}

// The plan might be modified, but it's modified in-place so we always return
Expand Down
29 changes: 24 additions & 5 deletions src/table/src/table/scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ use crate::table::metrics::StreamMetrics;
/// A plan to read multiple partitions from a region of a table.
#[derive(Debug)]
pub struct RegionScanExec {
scanner: Mutex<RegionScannerRef>,
scanner: Arc<Mutex<RegionScannerRef>>,
arrow_schema: ArrowSchemaRef,
/// The expected output ordering for the plan.
output_ordering: Option<Vec<PhysicalSortExpr>>,
Expand Down Expand Up @@ -70,7 +70,7 @@ impl RegionScanExec {
let append_mode = scanner_props.append_mode();
let total_rows = scanner_props.total_rows();
Self {
scanner: Mutex::new(scanner),
scanner: Arc::new(Mutex::new(scanner)),
arrow_schema,
output_ordering: None,
metric: ExecutionPlanMetricsSet::new(),
Expand Down Expand Up @@ -102,9 +102,28 @@ impl RegionScanExec {
}

/// Update the partition ranges of underlying scanner.
pub fn set_partitions(&self, partitions: Vec<Vec<PartitionRange>>) -> Result<(), BoxedError> {
let mut scanner = self.scanner.lock().unwrap();
scanner.prepare(partitions)
pub fn with_new_partitions(
&self,
partitions: Vec<Vec<PartitionRange>>,
) -> Result<Self, BoxedError> {
let num_partitions = partitions.len();
let mut properties = self.properties.clone();
properties.partitioning = Partitioning::UnknownPartitioning(num_partitions);

{
let mut scanner = self.scanner.lock().unwrap();
scanner.prepare(partitions)?;
}

Ok(Self {
scanner: self.scanner.clone(),
arrow_schema: self.arrow_schema.clone(),
output_ordering: self.output_ordering.clone(),
metric: self.metric.clone(),
properties,
append_mode: self.append_mode,
total_rows: self.total_rows,
})
}
}

Expand Down

0 comments on commit ed906df

Please sign in to comment.