Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Return None when Parquet page indexes are not present in file #6639

Merged
merged 4 commits into from
Nov 24, 2024
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 1 addition & 3 deletions parquet/src/arrow/arrow_reader/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -3584,9 +3584,7 @@ mod tests {
.unwrap();
// Although `Vec<Vec<PageLoacation>>` of each row group is empty,
// we should read the file successfully.
// FIXME: this test will fail when metadata parsing returns `None` for missing page
// indexes. https://github.com/apache/arrow-rs/issues/6447
assert!(builder.metadata().offset_index().unwrap()[0].is_empty());
assert!(builder.metadata().offset_index().is_none());
let reader = builder.build().unwrap();
let batches = reader.collect::<Result<Vec<_>, _>>().unwrap();
assert_eq!(batches.len(), 1);
Expand Down
2 changes: 1 addition & 1 deletion parquet/src/arrow/arrow_writer/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1743,7 +1743,7 @@ mod tests {
"Expected a dictionary page"
);

let offset_indexes = read_offset_indexes(&file, column).unwrap();
let offset_indexes = read_offset_indexes(&file, column).unwrap().unwrap();

let page_locations = offset_indexes[0].page_locations.clone();

Expand Down
6 changes: 2 additions & 4 deletions parquet/src/arrow/async_reader/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -928,7 +928,6 @@ mod tests {
use crate::arrow::schema::parquet_to_arrow_schema_and_fields;
use crate::arrow::ArrowWriter;
use crate::file::metadata::ParquetMetaDataReader;
use crate::file::page_index::index_reader;
use crate::file::properties::WriterProperties;
use arrow::compute::kernels::cmp::eq;
use arrow::error::Result as ArrowResult;
Expand Down Expand Up @@ -1566,12 +1565,11 @@ mod tests {
let data = Bytes::from(std::fs::read(path).unwrap());

let metadata = ParquetMetaDataReader::new()
.with_page_indexes(true)
.parse_and_finish(&data)
.unwrap();

let offset_index =
index_reader::read_offset_indexes(&data, metadata.row_group(0).columns())
.expect("reading offset index");
let offset_index = metadata.offset_index().expect("reading offset index")[0].clone();

let mut metadata_builder = metadata.into_builder();
let mut row_groups = metadata_builder.take_row_groups();
Expand Down
15 changes: 0 additions & 15 deletions parquet/src/file/metadata/reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -269,7 +269,6 @@ impl ParquetMetaDataReader {

// Get bounds needed for page indexes (if any are present in the file).
let Some(range) = self.range_for_page_index() else {
self.empty_page_indexes();
return Ok(());
};

Expand Down Expand Up @@ -446,20 +445,6 @@ impl ParquetMetaDataReader {
Ok(())
}

/// Set the column_index and offset_indexes to empty `Vec` for backwards compatibility
///
/// See <https://github.com/apache/arrow-rs/pull/6451> for details
fn empty_page_indexes(&mut self) {
let metadata = self.metadata.as_mut().unwrap();
let num_row_groups = metadata.num_row_groups();
if self.column_index {
metadata.set_column_index(Some(vec![vec![]; num_row_groups]));
}
if self.offset_index {
metadata.set_offset_index(Some(vec![vec![]; num_row_groups]));
}
}

fn range_for_page_index(&self) -> Option<Range<usize>> {
// sanity check
self.metadata.as_ref()?;
Expand Down
48 changes: 26 additions & 22 deletions parquet/src/file/page_index/index_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -43,35 +43,37 @@ pub(crate) fn acc_range(a: Option<Range<usize>>, b: Option<Range<usize>>) -> Opt
///
/// Returns a vector of `index[column_number]`.
///
/// Returns an empty vector if this row group does not contain a
/// [`ColumnIndex`].
/// Returns `None` if this row group does not contain a [`ColumnIndex`].
///
/// See [Page Index Documentation] for more details.
///
/// [Page Index Documentation]: https://github.com/apache/parquet-format/blob/master/PageIndex.md
pub fn read_columns_indexes<R: ChunkReader>(
reader: &R,
chunks: &[ColumnChunkMetaData],
) -> Result<Vec<Index>, ParquetError> {
) -> Result<Option<Vec<Index>>, ParquetError> {
let fetch = chunks
.iter()
.fold(None, |range, c| acc_range(range, c.column_index_range()));

let fetch = match fetch {
Some(r) => r,
None => return Ok(vec![Index::NONE; chunks.len()]),
None => return Ok(None),
};

let bytes = reader.get_bytes(fetch.start as _, fetch.end - fetch.start)?;
let get = |r: Range<usize>| &bytes[(r.start - fetch.start)..(r.end - fetch.start)];

chunks
.iter()
.map(|c| match c.column_index_range() {
Some(r) => decode_column_index(get(r), c.column_type()),
None => Ok(Index::NONE),
})
.collect()
Some(
chunks
.iter()
.map(|c| match c.column_index_range() {
Some(r) => decode_column_index(get(r), c.column_type()),
None => Ok(Index::NONE),
})
.collect(),
)
.transpose()
}

/// Reads [`OffsetIndex`], per-page [`PageLocation`] for all columns of a row
Expand Down Expand Up @@ -116,35 +118,37 @@ pub fn read_pages_locations<R: ChunkReader>(
///
/// Returns a vector of `offset_index[column_number]`.
///
/// Returns an empty vector if this row group does not contain an
/// [`OffsetIndex`].
/// Returns `None` if this row group does not contain an [`OffsetIndex`].
///
/// See [Page Index Documentation] for more details.
///
/// [Page Index Documentation]: https://github.com/apache/parquet-format/blob/master/PageIndex.md
pub fn read_offset_indexes<R: ChunkReader>(
reader: &R,
chunks: &[ColumnChunkMetaData],
) -> Result<Vec<OffsetIndexMetaData>, ParquetError> {
) -> Result<Option<Vec<OffsetIndexMetaData>>, ParquetError> {
let fetch = chunks
.iter()
.fold(None, |range, c| acc_range(range, c.offset_index_range()));

let fetch = match fetch {
Some(r) => r,
None => return Ok(vec![]),
None => return Ok(None),
};

let bytes = reader.get_bytes(fetch.start as _, fetch.end - fetch.start)?;
let get = |r: Range<usize>| &bytes[(r.start - fetch.start)..(r.end - fetch.start)];

chunks
.iter()
.map(|c| match c.offset_index_range() {
Some(r) => decode_offset_index(get(r)),
None => Err(general_err!("missing offset index")),
})
.collect()
Some(
chunks
.iter()
.map(|c| match c.offset_index_range() {
Some(r) => decode_offset_index(get(r)),
None => Err(general_err!("missing offset index")),
})
.collect(),
)
.transpose()
}

pub(crate) fn decode_offset_index(data: &[u8]) -> Result<OffsetIndexMetaData, ParquetError> {
Expand Down
22 changes: 12 additions & 10 deletions parquet/src/file/serialized_reader.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1223,8 +1223,8 @@ mod tests {
let reader = SerializedFileReader::new_with_options(test_file, read_options)?;
let metadata = reader.metadata();
assert_eq!(metadata.num_row_groups(), 0);
assert_eq!(metadata.column_index().unwrap().len(), 0);
assert_eq!(metadata.offset_index().unwrap().len(), 0);
assert!(metadata.column_index().is_none());
assert!(metadata.offset_index().is_none());

// false, true predicate
let test_file = get_test_file("alltypes_tiny_pages.parquet");
Expand All @@ -1236,8 +1236,8 @@ mod tests {
let reader = SerializedFileReader::new_with_options(test_file, read_options)?;
let metadata = reader.metadata();
assert_eq!(metadata.num_row_groups(), 0);
assert_eq!(metadata.column_index().unwrap().len(), 0);
assert_eq!(metadata.offset_index().unwrap().len(), 0);
assert!(metadata.column_index().is_none());
assert!(metadata.offset_index().is_none());

// false, false predicate
let test_file = get_test_file("alltypes_tiny_pages.parquet");
Expand All @@ -1249,8 +1249,8 @@ mod tests {
let reader = SerializedFileReader::new_with_options(test_file, read_options)?;
let metadata = reader.metadata();
assert_eq!(metadata.num_row_groups(), 0);
assert_eq!(metadata.column_index().unwrap().len(), 0);
assert_eq!(metadata.offset_index().unwrap().len(), 0);
assert!(metadata.column_index().is_none());
assert!(metadata.offset_index().is_none());
Ok(())
}

Expand Down Expand Up @@ -1340,13 +1340,15 @@ mod tests {
let columns = metadata.row_group(0).columns();
let reversed: Vec<_> = columns.iter().cloned().rev().collect();

let a = read_columns_indexes(&test_file, columns).unwrap();
let mut b = read_columns_indexes(&test_file, &reversed).unwrap();
let a = read_columns_indexes(&test_file, columns).unwrap().unwrap();
let mut b = read_columns_indexes(&test_file, &reversed)
.unwrap()
.unwrap();
b.reverse();
assert_eq!(a, b);

let a = read_offset_indexes(&test_file, columns).unwrap();
let mut b = read_offset_indexes(&test_file, &reversed).unwrap();
let a = read_offset_indexes(&test_file, columns).unwrap().unwrap();
let mut b = read_offset_indexes(&test_file, &reversed).unwrap().unwrap();
b.reverse();
assert_eq!(a, b);
}
Expand Down
Loading