Skip to content

Commit

Permalink
fix: update properties on updating partitions (#4627)
Browse files Browse the repository at this point in the history
* fix: update properties on updating partitions

Signed-off-by: Ruihang Xia <[email protected]>

* add unit test and handle insufficient ranges

Signed-off-by: Ruihang Xia <[email protected]>

---------

Signed-off-by: Ruihang Xia <[email protected]>
  • Loading branch information
waynexia authored Aug 28, 2024
1 parent 28bf549 commit d5455db
Show file tree
Hide file tree
Showing 3 changed files with 142 additions and 9 deletions.
120 changes: 117 additions & 3 deletions src/query/src/optimizer/parallelize_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -63,9 +63,10 @@ impl ParallelizeScan {
);

// update the partition ranges
region_scan_exec
.set_partitions(partition_ranges)
let new_exec = region_scan_exec
.with_new_partitions(partition_ranges)
.map_err(|e| DataFusionError::External(e.into_inner()))?;
return Ok(Transformed::yes(Arc::new(new_exec)));
}

// The plan might be modified, but it's modified in-place so we always return
Expand All @@ -80,11 +81,15 @@ impl ParallelizeScan {
/// Distribute [`PartitionRange`]s to each partition.
///
/// Currently we use a simple round-robin strategy to assign ranges to partitions.
/// This method may return partitions with smaller number than `expected_partition_num`
/// if the number of ranges is smaller than `expected_partition_num`. But this will
/// return at least one partition.
fn assign_partition_range(
ranges: Vec<PartitionRange>,
expected_partition_num: usize,
) -> Vec<Vec<PartitionRange>> {
let mut partition_ranges = vec![vec![]; expected_partition_num];
let actual_partition_num = expected_partition_num.min(ranges.len()).max(1);
let mut partition_ranges = vec![vec![]; actual_partition_num];

// round-robin assignment
for (i, range) in ranges.into_iter().enumerate() {
Expand All @@ -95,3 +100,112 @@ impl ParallelizeScan {
partition_ranges
}
}

#[cfg(test)]
mod test {
use common_time::timestamp::TimeUnit;
use common_time::Timestamp;

use super::*;

#[test]
fn test_assign_partition_range() {
let ranges = vec![
PartitionRange {
start: Timestamp::new(0, TimeUnit::Second),
end: Timestamp::new(10, TimeUnit::Second),
estimated_size: 100,
identifier: 1,
},
PartitionRange {
start: Timestamp::new(10, TimeUnit::Second),
end: Timestamp::new(20, TimeUnit::Second),
estimated_size: 200,
identifier: 2,
},
PartitionRange {
start: Timestamp::new(20, TimeUnit::Second),
end: Timestamp::new(30, TimeUnit::Second),
estimated_size: 150,
identifier: 3,
},
PartitionRange {
start: Timestamp::new(30, TimeUnit::Second),
end: Timestamp::new(40, TimeUnit::Second),
estimated_size: 250,
identifier: 4,
},
];

// assign to 2 partitions
let expected_partition_num = 2;
let result =
ParallelizeScan::assign_partition_range(ranges.clone(), expected_partition_num);
let expected = vec![
vec![
PartitionRange {
start: Timestamp::new(0, TimeUnit::Second),
end: Timestamp::new(10, TimeUnit::Second),
estimated_size: 100,
identifier: 1,
},
PartitionRange {
start: Timestamp::new(20, TimeUnit::Second),
end: Timestamp::new(30, TimeUnit::Second),
estimated_size: 150,
identifier: 3,
},
],
vec![
PartitionRange {
start: Timestamp::new(10, TimeUnit::Second),
end: Timestamp::new(20, TimeUnit::Second),
estimated_size: 200,
identifier: 2,
},
PartitionRange {
start: Timestamp::new(30, TimeUnit::Second),
end: Timestamp::new(40, TimeUnit::Second),
estimated_size: 250,
identifier: 4,
},
],
];
assert_eq!(result, expected);

// assign 4 ranges to 5 partitions. Only 4 partitions are returned.
let expected_partition_num = 5;
let result = ParallelizeScan::assign_partition_range(ranges, expected_partition_num);
let expected = vec![
vec![PartitionRange {
start: Timestamp::new(0, TimeUnit::Second),
end: Timestamp::new(10, TimeUnit::Second),
estimated_size: 100,
identifier: 1,
}],
vec![PartitionRange {
start: Timestamp::new(10, TimeUnit::Second),
end: Timestamp::new(20, TimeUnit::Second),
estimated_size: 200,
identifier: 2,
}],
vec![PartitionRange {
start: Timestamp::new(20, TimeUnit::Second),
end: Timestamp::new(30, TimeUnit::Second),
estimated_size: 150,
identifier: 3,
}],
vec![PartitionRange {
start: Timestamp::new(30, TimeUnit::Second),
end: Timestamp::new(40, TimeUnit::Second),
estimated_size: 250,
identifier: 4,
}],
];
assert_eq!(result, expected);

// assign 0 ranges to 5 partitions. Only 1 partition is returned.
let result = ParallelizeScan::assign_partition_range(vec![], 5);
assert_eq!(result.len(), 1);
}
}
2 changes: 1 addition & 1 deletion src/store-api/src/region_engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,7 @@ impl ScannerPartitioning {
}

/// Represents one data range within a partition
#[derive(Debug, Clone, Copy)]
#[derive(Debug, Clone, Copy, PartialEq, Eq)]
pub struct PartitionRange {
/// Start time of time index column. Inclusive.
pub start: Timestamp,
Expand Down
29 changes: 24 additions & 5 deletions src/table/src/table/scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -41,7 +41,7 @@ use crate::table::metrics::StreamMetrics;
/// A plan to read multiple partitions from a region of a table.
#[derive(Debug)]
pub struct RegionScanExec {
scanner: Mutex<RegionScannerRef>,
scanner: Arc<Mutex<RegionScannerRef>>,
arrow_schema: ArrowSchemaRef,
/// The expected output ordering for the plan.
output_ordering: Option<Vec<PhysicalSortExpr>>,
Expand Down Expand Up @@ -70,7 +70,7 @@ impl RegionScanExec {
let append_mode = scanner_props.append_mode();
let total_rows = scanner_props.total_rows();
Self {
scanner: Mutex::new(scanner),
scanner: Arc::new(Mutex::new(scanner)),
arrow_schema,
output_ordering: None,
metric: ExecutionPlanMetricsSet::new(),
Expand Down Expand Up @@ -102,9 +102,28 @@ impl RegionScanExec {
}

/// Update the partition ranges of underlying scanner.
pub fn set_partitions(&self, partitions: Vec<Vec<PartitionRange>>) -> Result<(), BoxedError> {
let mut scanner = self.scanner.lock().unwrap();
scanner.prepare(partitions)
pub fn with_new_partitions(
&self,
partitions: Vec<Vec<PartitionRange>>,
) -> Result<Self, BoxedError> {
let num_partitions = partitions.len();
let mut properties = self.properties.clone();
properties.partitioning = Partitioning::UnknownPartitioning(num_partitions);

{
let mut scanner = self.scanner.lock().unwrap();
scanner.prepare(partitions)?;
}

Ok(Self {
scanner: self.scanner.clone(),
arrow_schema: self.arrow_schema.clone(),
output_ordering: self.output_ordering.clone(),
metric: self.metric.clone(),
properties,
append_mode: self.append_mode,
total_rows: self.total_rows,
})
}
}

Expand Down

0 comments on commit d5455db

Please sign in to comment.