Skip to content

Commit

Permalink
fix(fulltext-search): prune rows in row group forget to take remainder (
Browse files Browse the repository at this point in the history
#4447)

* fix(fulltext-search): prune rows in row group forget to take remainder

Signed-off-by: Zhenchi <[email protected]>

* test: add unit test

Signed-off-by: Zhenchi <[email protected]>

---------

Signed-off-by: Zhenchi <[email protected]>
  • Loading branch information
zhongzc authored Jul 29, 2024
1 parent b298b35 commit cb94bd4
Show file tree
Hide file tree
Showing 2 changed files with 57 additions and 25 deletions.
78 changes: 55 additions & 23 deletions src/mito2/src/sst/parquet/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,7 +14,7 @@

//! Parquet reader.
use std::collections::{BTreeMap, VecDeque};
use std::collections::{BTreeMap, BTreeSet, VecDeque};
use std::ops::Range;
use std::sync::Arc;
use std::time::{Duration, Instant};
Expand Down Expand Up @@ -408,13 +408,11 @@ impl ParquetReaderBuilder {
}
};

let row_group_to_row_ids = apply_res
.into_iter()
.group_by(|row_id| *row_id as usize / row_group_size);

let row_group_to_row_ids =
Self::group_row_ids(apply_res, row_group_size, parquet_meta.num_row_groups());
Self::prune_row_groups_by_rows(
parquet_meta,
row_group_to_row_ids.into_iter(),
row_group_to_row_ids,
output,
&mut metrics.num_row_groups_fulltext_index_filtered,
&mut metrics.num_rows_in_row_group_fulltext_index_filtered,
Expand All @@ -423,6 +421,33 @@ impl ParquetReaderBuilder {
true
}

/// Groups row IDs into row groups, with each group's row IDs starting from 0.
fn group_row_ids(
row_ids: BTreeSet<u32>,
row_group_size: usize,
num_row_groups: usize,
) -> Vec<(usize, Vec<usize>)> {
let est_rows_per_group = row_ids.len() / num_row_groups;

let mut row_group_to_row_ids: Vec<(usize, Vec<usize>)> = Vec::with_capacity(num_row_groups);
for row_id in row_ids {
let row_group_id = row_id as usize / row_group_size;
let row_id_in_group = row_id as usize % row_group_size;

if let Some((rg_id, row_ids)) = row_group_to_row_ids.last_mut()
&& *rg_id == row_group_id
{
row_ids.push(row_id_in_group);
} else {
let mut row_ids = Vec::with_capacity(est_rows_per_group);
row_ids.push(row_id_in_group);
row_group_to_row_ids.push((row_group_id, row_ids));
}
}

row_group_to_row_ids
}

/// Applies index to prune row groups.
///
/// TODO(zhongzc): Devise a mechanism to enforce the non-use of indices
Expand Down Expand Up @@ -540,7 +565,7 @@ impl ParquetReaderBuilder {
/// a list of row ids to keep.
fn prune_row_groups_by_rows(
parquet_meta: &ParquetMetaData,
rows_in_row_groups: impl Iterator<Item = (usize, impl Iterator<Item = u32>)>,
rows_in_row_groups: Vec<(usize, Vec<usize>)>,
output: &mut BTreeMap<usize, Option<RowSelection>>,
filtered_row_groups: &mut usize,
filtered_rows: &mut usize,
Expand All @@ -560,7 +585,8 @@ impl ParquetReaderBuilder {
.as_ref()
.map_or(total_row_count, |s| s.row_count());

let new_selection = row_selection_from_sorted_row_ids(row_ids, total_row_count);
let new_selection =
row_selection_from_sorted_row_ids(row_ids.into_iter(), total_row_count);
let intersected_selection = intersect_row_selections(selection, Some(new_selection));

let num_rows_after = intersected_selection
Expand Down Expand Up @@ -1159,14 +1185,26 @@ mod tests {
ParquetMetaData::new(file_meta, row_groups)
}

#[test]
fn test_group_row_ids() {
let row_ids = [0, 1, 2, 5, 6, 7, 8, 12].into_iter().collect();
let row_group_size = 5;
let num_row_groups = 3;

let row_group_to_row_ids =
ParquetReaderBuilder::group_row_ids(row_ids, row_group_size, num_row_groups);

assert_eq!(
row_group_to_row_ids,
vec![(0, vec![0, 1, 2]), (1, vec![0, 1, 2, 3]), (2, vec![2])]
);
}

#[test]
fn prune_row_groups_by_rows_from_empty() {
let parquet_meta = mock_parquet_metadata_from_row_groups(vec![10, 10, 5]);

let rows_in_row_groups = [
(0, [5, 6, 7, 8, 9].into_iter()),
(2, [0, 1, 2, 3, 4].into_iter()),
];
let rows_in_row_groups = vec![(0, vec![5, 6, 7, 8, 9]), (2, vec![0, 1, 2, 3, 4])];

// The original output is empty. No row groups are pruned.
let mut output = BTreeMap::new();
Expand All @@ -1175,7 +1213,7 @@ mod tests {

ParquetReaderBuilder::prune_row_groups_by_rows(
&parquet_meta,
rows_in_row_groups.into_iter(),
rows_in_row_groups,
&mut output,
&mut filtered_row_groups,
&mut filtered_rows,
Expand All @@ -1190,10 +1228,7 @@ mod tests {
fn prune_row_groups_by_rows_from_full() {
let parquet_meta = mock_parquet_metadata_from_row_groups(vec![10, 10, 5]);

let rows_in_row_groups = [
(0, [5, 6, 7, 8, 9].into_iter()),
(2, [0, 1, 2, 3, 4].into_iter()),
];
let rows_in_row_groups = vec![(0, vec![5, 6, 7, 8, 9]), (2, vec![0, 1, 2, 3, 4])];

// The original output is full.
let mut output = BTreeMap::from([(0, None), (1, None), (2, None)]);
Expand All @@ -1202,7 +1237,7 @@ mod tests {

ParquetReaderBuilder::prune_row_groups_by_rows(
&parquet_meta,
rows_in_row_groups.into_iter(),
rows_in_row_groups,
&mut output,
&mut filtered_row_groups,
&mut filtered_rows,
Expand All @@ -1229,10 +1264,7 @@ mod tests {
fn prune_row_groups_by_rows_from_not_full() {
let parquet_meta = mock_parquet_metadata_from_row_groups(vec![10, 10, 5]);

let rows_in_row_groups = [
(0, [5, 6, 7, 8, 9].into_iter()),
(2, [0, 1, 2, 3, 4].into_iter()),
];
let rows_in_row_groups = vec![(0, vec![5, 6, 7, 8, 9]), (2, vec![0, 1, 2, 3, 4])];

// The original output is not full.
let mut output = BTreeMap::from([
Expand All @@ -1257,7 +1289,7 @@ mod tests {

ParquetReaderBuilder::prune_row_groups_by_rows(
&parquet_meta,
rows_in_row_groups.into_iter(),
rows_in_row_groups,
&mut output,
&mut filtered_row_groups,
&mut filtered_rows,
Expand Down
4 changes: 2 additions & 2 deletions src/mito2/src/sst/parquet/row_selection.rs
Original file line number Diff line number Diff line change
Expand Up @@ -53,14 +53,14 @@ pub(crate) fn row_selection_from_row_ranges(
/// Note: the input iterator must be sorted in ascending order and
/// contain unique row IDs in the range [0, total_row_count).
pub(crate) fn row_selection_from_sorted_row_ids(
row_ids: impl Iterator<Item = u32>,
row_ids: impl Iterator<Item = usize>,
total_row_count: usize,
) -> RowSelection {
let mut selectors: Vec<RowSelector> = Vec::new();
let mut last_processed_end = 0;

for row_id in row_ids {
let start = row_id as usize;
let start = row_id;
let end = start + 1;

if start > last_processed_end {
Expand Down

0 comments on commit cb94bd4

Please sign in to comment.