Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

feat: initialize partition range from ScanInput #4635

Merged
merged 6 commits into from
Aug 30, 2024
Merged
Show file tree
Hide file tree
Changes from 4 commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 32 additions & 1 deletion src/mito2/src/read/scan_region.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,7 +25,7 @@ use common_time::range::TimestampRange;
use common_time::Timestamp;
use datafusion::physical_plan::DisplayFormatType;
use smallvec::SmallVec;
use store_api::region_engine::RegionScannerRef;
use store_api::region_engine::{PartitionRange, RegionScannerRef};
use store_api::storage::{ScanRequest, TimeSeriesRowSelector};
use table::predicate::{build_time_range_predicate, Predicate};
use tokio::sync::{mpsc, Mutex, Semaphore};
Expand Down Expand Up @@ -705,6 +705,37 @@ impl ScanInput {
let rows_in_memtables: usize = self.memtables.iter().map(|m| m.stats().num_rows()).sum();
rows_in_files + rows_in_memtables
}

/// Retrieves [`PartitionRange`] from memtable and files
pub(crate) fn partition_ranges(&self) -> Vec<PartitionRange> {
let mut id = 0;
let mut container = Vec::with_capacity(self.memtables.len() + self.files.len());

for memtable in &self.memtables {
let range = PartitionRange {
// TODO: filter out empty memtables in the future.
waynexia marked this conversation as resolved.
Show resolved Hide resolved
start: memtable.stats().time_range().unwrap().0,
end: memtable.stats().time_range().unwrap().1,
num_rows: memtable.stats().num_rows(),
identifier: id,
};
id += 1;
container.push(range);
}

for file in &self.files {
let range = PartitionRange {
start: file.meta_ref().time_range.0,
end: file.meta_ref().time_range.1,
num_rows: file.meta_ref().num_rows as usize,
identifier: id,
};
id += 1;
container.push(range);
}

container
}
}

#[cfg(test)]
Expand Down
3 changes: 2 additions & 1 deletion src/mito2/src/read/seq_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -67,10 +67,11 @@ impl SeqScan {
/// Creates a new [SeqScan].
pub(crate) fn new(input: ScanInput) -> Self {
let parallelism = input.parallelism.parallelism.max(1);
let properties = ScannerProperties::default()
let mut properties = ScannerProperties::default()
waynexia marked this conversation as resolved.
Show resolved Hide resolved
.with_parallelism(parallelism)
.with_append_mode(input.append_mode)
.with_total_rows(input.total_rows());
properties.partitions = vec![input.partition_ranges()];
let stream_ctx = Arc::new(StreamContext::new(input));

Self {
Expand Down
3 changes: 2 additions & 1 deletion src/mito2/src/read/unordered_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -59,10 +59,11 @@ impl UnorderedScan {
/// Creates a new [UnorderedScan].
pub(crate) fn new(input: ScanInput) -> Self {
let parallelism = input.parallelism.parallelism.max(1);
let properties = ScannerProperties::default()
let mut properties = ScannerProperties::default()
.with_parallelism(parallelism)
.with_append_mode(input.append_mode)
.with_total_rows(input.total_rows());
properties.partitions = vec![input.partition_ranges()];
let stream_ctx = Arc::new(StreamContext::new(input));

Self {
Expand Down
25 changes: 12 additions & 13 deletions src/query/src/optimizer/parallelize_scan.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,6 @@ impl ParallelizeScan {
debug!(
"Assign {total_range_num} ranges to {expected_partition_num} partitions"
);

// update the partition ranges
let new_exec = region_scan_exec
.with_new_partitions(partition_ranges)
Expand Down Expand Up @@ -114,25 +113,25 @@ mod test {
PartitionRange {
start: Timestamp::new(0, TimeUnit::Second),
end: Timestamp::new(10, TimeUnit::Second),
estimated_size: 100,
num_rows: 100,
identifier: 1,
},
PartitionRange {
start: Timestamp::new(10, TimeUnit::Second),
end: Timestamp::new(20, TimeUnit::Second),
estimated_size: 200,
num_rows: 200,
identifier: 2,
},
PartitionRange {
start: Timestamp::new(20, TimeUnit::Second),
end: Timestamp::new(30, TimeUnit::Second),
estimated_size: 150,
num_rows: 150,
identifier: 3,
},
PartitionRange {
start: Timestamp::new(30, TimeUnit::Second),
end: Timestamp::new(40, TimeUnit::Second),
estimated_size: 250,
num_rows: 250,
identifier: 4,
},
];
Expand All @@ -146,27 +145,27 @@ mod test {
PartitionRange {
start: Timestamp::new(0, TimeUnit::Second),
end: Timestamp::new(10, TimeUnit::Second),
estimated_size: 100,
num_rows: 100,
identifier: 1,
},
PartitionRange {
start: Timestamp::new(20, TimeUnit::Second),
end: Timestamp::new(30, TimeUnit::Second),
estimated_size: 150,
num_rows: 150,
identifier: 3,
},
],
vec![
PartitionRange {
start: Timestamp::new(10, TimeUnit::Second),
end: Timestamp::new(20, TimeUnit::Second),
estimated_size: 200,
num_rows: 200,
identifier: 2,
},
PartitionRange {
start: Timestamp::new(30, TimeUnit::Second),
end: Timestamp::new(40, TimeUnit::Second),
estimated_size: 250,
num_rows: 250,
identifier: 4,
},
],
Expand All @@ -180,25 +179,25 @@ mod test {
vec![PartitionRange {
start: Timestamp::new(0, TimeUnit::Second),
end: Timestamp::new(10, TimeUnit::Second),
estimated_size: 100,
num_rows: 100,
identifier: 1,
}],
vec![PartitionRange {
start: Timestamp::new(10, TimeUnit::Second),
end: Timestamp::new(20, TimeUnit::Second),
estimated_size: 200,
num_rows: 200,
identifier: 2,
}],
vec![PartitionRange {
start: Timestamp::new(20, TimeUnit::Second),
end: Timestamp::new(30, TimeUnit::Second),
estimated_size: 150,
num_rows: 150,
identifier: 3,
}],
vec![PartitionRange {
start: Timestamp::new(30, TimeUnit::Second),
end: Timestamp::new(40, TimeUnit::Second),
estimated_size: 250,
num_rows: 250,
identifier: 4,
}],
];
Expand Down
5 changes: 2 additions & 3 deletions src/store-api/src/region_engine.rs
Original file line number Diff line number Diff line change
Expand Up @@ -149,9 +149,8 @@ pub struct PartitionRange {
pub start: Timestamp,
/// End time of time index column. Inclusive.
pub end: Timestamp,
/// Estimate size of this range. Is used to balance ranges between partitions.
/// No base unit, just a number.
pub estimated_size: usize,
/// Number of rows in this range. Is used to balance ranges between partitions.
pub num_rows: usize,
/// Identifier to this range. Assigned by storage engine.
pub identifier: usize,
}
Expand Down
Loading