Skip to content

Commit

Permalink
Bind read_parquet_metadata API to libcudf instead of pyarrow and ex…
Browse files Browse the repository at this point in the history
…tract `RowGroup` information (#15398)

The `cudf.io.read_parquet_metadata` is now bound to corresponding libcudf API instead of relying on pyarrow. The libcudf API now also returns high level `RowGroup` metadata to solve #11214. Added additional tests and doc updates as well. 

More metadata information such `min, max` values for each column in each row group can also be extracted and returned if needed. Thoughts? 

Recommend: Closing #15320 without merging in favor of this PR.

Authors:
  - Muhammad Haseeb (https://github.com/mhaseeb123)
  - GALI PREM SAGAR (https://github.com/galipremsagar)

Approvers:
  - GALI PREM SAGAR (https://github.com/galipremsagar)
  - Vukasin Milovanovic (https://github.com/vuule)
  - Bradley Dice (https://github.com/bdice)

URL: #15398
  • Loading branch information
mhaseeb123 authored Apr 17, 2024
1 parent 9f2fdf8 commit f222b4a
Show file tree
Hide file tree
Showing 9 changed files with 249 additions and 17 deletions.
41 changes: 38 additions & 3 deletions cpp/include/cudf/io/parquet_metadata.hpp
Original file line number Diff line number Diff line change
@@ -1,5 +1,5 @@
/*
* Copyright (c) 2023, NVIDIA CORPORATION.
* Copyright (c) 2023-2024, NVIDIA CORPORATION.
*
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
Expand Down Expand Up @@ -59,6 +59,13 @@ enum class TypeKind : int8_t {
*/
struct parquet_column_schema {
public:
/**
* @brief Default constructor.
*
* This has been added since Cython requires a default constructor to create objects on stack.
*/
explicit parquet_column_schema() = default;

/**
* @brief constructor
*
Expand Down Expand Up @@ -134,6 +141,13 @@ struct parquet_column_schema {
*/
struct parquet_schema {
public:
/**
* @brief Default constructor.
*
* This has been added since Cython requires a default constructor to create objects on stack.
*/
explicit parquet_schema() = default;

/**
* @brief constructor
*
Expand Down Expand Up @@ -165,6 +179,15 @@ class parquet_metadata {
public:
/// Key-value metadata in the file footer.
using key_value_metadata = std::unordered_map<std::string, std::string>;
/// row group metadata from each RowGroup element.
using row_group_metadata = std::unordered_map<std::string, int64_t>;

/**
* @brief Default constructor.
*
* This has been added since Cython requires a default constructor to create objects on stack.
*/
explicit parquet_metadata() = default;

/**
* @brief constructor
Expand All @@ -173,15 +196,18 @@ class parquet_metadata {
* @param num_rows number of rows
* @param num_rowgroups number of row groups
* @param file_metadata key-value metadata in the file footer
* @param rg_metadata vector of maps containing metadata for each row group
*/
parquet_metadata(parquet_schema schema,
int64_t num_rows,
size_type num_rowgroups,
key_value_metadata file_metadata)
key_value_metadata file_metadata,
std::vector<row_group_metadata> rg_metadata)
: _schema{std::move(schema)},
_num_rows{num_rows},
_num_rowgroups{num_rowgroups},
_file_metadata{std::move(file_metadata)}
_file_metadata{std::move(file_metadata)},
_rowgroup_metadata{std::move(rg_metadata)}
{
}

Expand All @@ -207,18 +233,27 @@ class parquet_metadata {
* @return Number of row groups
*/
[[nodiscard]] auto num_rowgroups() const { return _num_rowgroups; }

/**
* @brief Returns the Key value metadata in the file footer.
*
* @return Key value metadata as a map
*/
[[nodiscard]] auto const& metadata() const { return _file_metadata; }

/**
* @brief Returns the row group metadata in the file footer.
*
* @return vector of row group metadata as maps
*/
[[nodiscard]] auto const& rowgroup_metadata() const { return _rowgroup_metadata; }

private:
parquet_schema _schema;
int64_t _num_rows;
size_type _num_rowgroups;
key_value_metadata _file_metadata;
std::vector<row_group_metadata> _rowgroup_metadata;
};

/**
Expand Down
3 changes: 2 additions & 1 deletion cpp/src/io/parquet/reader_impl.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -609,7 +609,8 @@ parquet_metadata read_parquet_metadata(host_span<std::unique_ptr<datasource> con
return parquet_metadata{parquet_schema{walk_schema(&metadata, 0)},
metadata.get_num_rows(),
metadata.get_num_row_groups(),
metadata.get_key_value_metadata()[0]};
metadata.get_key_value_metadata()[0],
metadata.get_rowgroup_metadata()};
}

} // namespace cudf::io::parquet::detail
20 changes: 20 additions & 0 deletions cpp/src/io/parquet/reader_impl_helpers.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -560,6 +560,26 @@ ColumnChunkMetaData const& aggregate_reader_metadata::get_column_metadata(size_t
return col->meta_data;
}

std::vector<std::unordered_map<std::string, int64_t>>
aggregate_reader_metadata::get_rowgroup_metadata() const
{
std::vector<std::unordered_map<std::string, int64_t>> rg_metadata;

std::for_each(
per_file_metadata.cbegin(), per_file_metadata.cend(), [&rg_metadata](auto const& pfm) {
std::transform(pfm.row_groups.cbegin(),
pfm.row_groups.cend(),
std::back_inserter(rg_metadata),
[](auto const& rg) {
std::unordered_map<std::string, int64_t> rg_meta_map;
rg_meta_map["num_rows"] = rg.num_rows;
rg_meta_map["total_byte_size"] = rg.total_byte_size;
return rg_meta_map;
});
});
return rg_metadata;
}

std::string aggregate_reader_metadata::get_pandas_index() const
{
// Assumes that all input files have the same metadata
Expand Down
8 changes: 8 additions & 0 deletions cpp/src/io/parquet/reader_impl_helpers.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -166,6 +166,13 @@ class aggregate_reader_metadata {
size_type src_idx,
int schema_idx) const;

/**
* @brief Extracts high-level metadata for all row groups
*
* @return List of maps containing metadata information for each row group
*/
[[nodiscard]] std::vector<std::unordered_map<std::string, int64_t>> get_rowgroup_metadata() const;

[[nodiscard]] auto get_num_rows() const { return num_rows; }

[[nodiscard]] auto get_num_row_groups() const { return num_row_groups; }
Expand All @@ -178,6 +185,7 @@ class aggregate_reader_metadata {
[[nodiscard]] auto const& get_key_value_metadata() const& { return keyval_maps; }

[[nodiscard]] auto&& get_key_value_metadata() && { return std::move(keyval_maps); }

/**
* @brief Gets the concrete nesting depth of output cudf columns
*
Expand Down
32 changes: 32 additions & 0 deletions python/cudf/cudf/_lib/cpp/io/parquet_metadata.pxd
Original file line number Diff line number Diff line change
@@ -0,0 +1,32 @@
# Copyright (c) 2024, NVIDIA CORPORATION.

from libc.stdint cimport int64_t
from libcpp.string cimport string
from libcpp.unordered_map cimport unordered_map
from libcpp.vector cimport vector

cimport cudf._lib.cpp.io.types as cudf_io_types
from cudf._lib.cpp.types cimport size_type


cdef extern from "cudf/io/parquet_metadata.hpp" namespace "cudf::io" nogil:
cdef cppclass parquet_column_schema:
parquet_column_schema() except+
string name() except+
size_type num_children() except+
parquet_column_schema child(int idx) except+
vector[parquet_column_schema] children() except+

cdef cppclass parquet_schema:
parquet_schema() except+
parquet_column_schema root() except+

cdef cppclass parquet_metadata:
parquet_metadata() except+
parquet_schema schema() except+
int64_t num_rows() except+
size_type num_rowgroups() except+
unordered_map[string, string] metadata() except+
vector[unordered_map[string, int64_t]] rowgroup_metadata() except+

cdef parquet_metadata read_parquet_metadata(cudf_io_types.source_info src) except+
69 changes: 69 additions & 0 deletions python/cudf/cudf/_lib/parquet.pyx
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,10 @@ from cudf._lib.cpp.io.parquet cimport (
read_parquet as parquet_reader,
write_parquet as parquet_writer,
)
from cudf._lib.cpp.io.parquet_metadata cimport (
parquet_metadata,
read_parquet_metadata as parquet_metadata_reader,
)
from cudf._lib.cpp.io.types cimport column_in_metadata, table_input_metadata
from cudf._lib.cpp.table.table_view cimport table_view
from cudf._lib.cpp.types cimport data_type, size_type
Expand Down Expand Up @@ -316,6 +320,71 @@ cpdef read_parquet(filepaths_or_buffers, columns=None, row_groups=None,
df._data.label_dtype = cudf.dtype(column_index_type)
return df

cpdef read_parquet_metadata(filepaths_or_buffers):
"""
Cython function to call into libcudf API, see `read_parquet_metadata`.
See Also
--------
cudf.io.parquet.read_parquet
cudf.io.parquet.to_parquet
"""
# Convert NativeFile buffers to NativeFileDatasource
for i, datasource in enumerate(filepaths_or_buffers):
if isinstance(datasource, NativeFile):
filepaths_or_buffers[i] = NativeFileDatasource(datasource)

cdef cudf_io_types.source_info source = make_source_info(filepaths_or_buffers)

args = move(source)

cdef parquet_metadata c_result

# Read Parquet metadata
with nogil:
c_result = move(parquet_metadata_reader(args))

# access and return results
num_rows = c_result.num_rows()
num_rowgroups = c_result.num_rowgroups()

# extract row group metadata and sanitize keys
row_group_metadata = [{k.decode(): v for k, v in metadata}
for metadata in c_result.rowgroup_metadata()]

# read all column names including index column, if any
col_names = [info.name().decode() for info in c_result.schema().root().children()]

# access the Parquet file_footer to find the index
index_col = None
cdef unordered_map[string, string] file_footer = c_result.metadata()

# get index column name(s)
index_col_names = None
json_str = file_footer[b'pandas'].decode('utf-8')
meta = None
if json_str != "":
meta = json.loads(json_str)
file_is_range_index, index_col, _ = _parse_metadata(meta)
if not file_is_range_index and index_col is not None \
and index_col_names is None:
index_col_names = {}
for idx_col in index_col:
for c in meta['columns']:
if c['field_name'] == idx_col:
index_col_names[idx_col] = c['name']

# remove the index column from the list of column names
# only if index_col_names is not None
if index_col_names is not None:
col_names = [name for name in col_names if name not in index_col_names]

# num_columns = length of list(col_names)
num_columns = len(col_names)

# return the metadata
return num_rows, num_rowgroups, col_names, num_columns, row_group_metadata


@acquire_spill_lock()
def write_parquet(
Expand Down
42 changes: 35 additions & 7 deletions python/cudf/cudf/io/parquet.py
Original file line number Diff line number Diff line change
Expand Up @@ -267,17 +267,45 @@ def write_to_dataset(

@ioutils.doc_read_parquet_metadata()
@_cudf_nvtx_annotate
def read_parquet_metadata(path):
def read_parquet_metadata(filepath_or_buffer):
"""{docstring}"""
import pyarrow.parquet as pq
# Multiple sources are passed as a list. If a single source is passed,
# wrap it in a list for unified processing downstream.
if not is_list_like(filepath_or_buffer):
filepath_or_buffer = [filepath_or_buffer]

pq_file = pq.ParquetFile(path)
# Start by trying to construct a filesystem object
fs, paths = ioutils._get_filesystem_and_paths(
path_or_data=filepath_or_buffer, storage_options=None
)

num_rows = pq_file.metadata.num_rows
num_row_groups = pq_file.num_row_groups
col_names = pq_file.schema.names
# Check if filepath or buffer
filepath_or_buffer = paths if paths else filepath_or_buffer

# List of filepaths or buffers
filepaths_or_buffers = []

for source in filepath_or_buffer:
tmp_source, compression = ioutils.get_reader_filepath_or_buffer(
path_or_data=source,
compression=None,
fs=fs,
use_python_file_object=True,
open_file_options=None,
storage_options=None,
bytes_per_thread=None,
)

if compression is not None:
raise ValueError(
"URL content-encoding decompression is not supported"
)
if isinstance(tmp_source, list):
filepath_or_buffer.extend(tmp_source)
else:
filepaths_or_buffers.append(tmp_source)

return num_rows, num_row_groups, col_names
return libparquet.read_parquet_metadata(filepaths_or_buffers)


@_cudf_nvtx_annotate
Expand Down
Loading

0 comments on commit f222b4a

Please sign in to comment.