From 165d756f7f7cb558d1cab62a81a1c91368648d12 Mon Sep 17 00:00:00 2001 From: Matthew Murray <41342305+Matt711@users.noreply.github.com> Date: Tue, 26 Nov 2024 13:38:22 -0500 Subject: [PATCH] Migrate ORC Writer to pylibcudf (#17310) Apart of #15162. Authors: - Matthew Murray (https://github.com/Matt711) Approvers: - Lawrence Mitchell (https://github.com/wence-) - Matthew Roeschke (https://github.com/mroeschke) URL: https://github.com/rapidsai/cudf/pull/17310 --- python/cudf/cudf/_lib/orc.pyx | 167 ++++--- python/pylibcudf/pylibcudf/io/orc.pxd | 65 ++- python/pylibcudf/pylibcudf/io/orc.pyi | 51 ++- python/pylibcudf/pylibcudf/io/orc.pyx | 413 +++++++++++++++++- python/pylibcudf/pylibcudf/io/types.pxd | 17 +- python/pylibcudf/pylibcudf/io/types.pyi | 22 +- python/pylibcudf/pylibcudf/io/types.pyx | 61 ++- .../pylibcudf/pylibcudf/tests/io/test_orc.py | 62 +++ .../pylibcudf/tests/io/test_types.py | 28 ++ 9 files changed, 762 insertions(+), 124 deletions(-) create mode 100644 python/pylibcudf/pylibcudf/tests/io/test_types.py diff --git a/python/cudf/cudf/_lib/orc.pyx b/python/cudf/cudf/_lib/orc.pyx index 32a5e463916..c829cac6409 100644 --- a/python/cudf/cudf/_lib/orc.pyx +++ b/python/cudf/cudf/_lib/orc.pyx @@ -3,11 +3,9 @@ from libc.stdint cimport int64_t from libcpp cimport bool, int from libcpp.map cimport map -from libcpp.memory cimport unique_ptr from libcpp.string cimport string -from libcpp.utility cimport move from libcpp.vector cimport vector - +import itertools from collections import OrderedDict try: @@ -16,23 +14,10 @@ except ImportError: import json cimport pylibcudf.libcudf.lists.lists_column_view as cpp_lists_column_view -from pylibcudf.libcudf.io.data_sink cimport data_sink -from pylibcudf.libcudf.io.orc cimport ( - chunked_orc_writer_options, - orc_chunked_writer, - orc_writer_options, - write_orc as libcudf_write_orc, -) -from pylibcudf.libcudf.io.types cimport ( - column_in_metadata, - sink_info, - table_input_metadata, -) -from pylibcudf.libcudf.table.table_view cimport table_view from cudf._lib.column cimport Column -from cudf._lib.io.utils cimport make_sink_info, update_col_struct_field_names -from cudf._lib.utils cimport data_from_pylibcudf_io, table_view_from_table +from cudf._lib.io.utils cimport update_col_struct_field_names +from cudf._lib.utils cimport data_from_pylibcudf_io import pylibcudf as plc @@ -40,7 +25,8 @@ import cudf from cudf._lib.types import SUPPORTED_NUMPY_TO_PYLIBCUDF_TYPES from cudf._lib.utils import _index_level_name, generate_pandas_metadata from cudf.core.buffer import acquire_spill_lock - +from pylibcudf.io.types cimport TableInputMetadata, SinkInfo, ColumnInMetadata +from pylibcudf.io.orc cimport OrcChunkedWriter # TODO: Consider inlining this function since it seems to only be used in one place. cpdef read_parsed_orc_statistics(filepath_or_buffer): @@ -246,36 +232,33 @@ def write_orc( -------- cudf.read_orc """ - cdef unique_ptr[data_sink] data_sink_c - cdef sink_info sink_info_c = make_sink_info(path_or_buf, data_sink_c) - cdef table_input_metadata tbl_meta - cdef map[string, string] user_data - user_data[str.encode("pandas")] = str.encode(generate_pandas_metadata( - table, index) - ) - + user_data = {} + user_data["pandas"] = generate_pandas_metadata(table, index) if index is True or ( index is None and not isinstance(table._index, cudf.RangeIndex) ): - tv = table_view_from_table(table) - tbl_meta = table_input_metadata(tv) + columns = table._columns if table._index is None else [ + *table.index._columns, *table._columns + ] + plc_table = plc.Table([col.to_pylibcudf(mode="read") for col in columns]) + tbl_meta = TableInputMetadata(plc_table) for level, idx_name in enumerate(table._index.names): tbl_meta.column_metadata[level].set_name( - str.encode( - _index_level_name(idx_name, level, table._column_names) - ) + _index_level_name(idx_name, level, table._column_names) ) num_index_cols_meta = len(table._index.names) else: - tv = table_view_from_table(table, ignore_index=True) - tbl_meta = table_input_metadata(tv) + plc_table = plc.Table( + [col.to_pylibcudf(mode="read") for col in table._columns] + ) + tbl_meta = TableInputMetadata(plc_table) num_index_cols_meta = 0 if cols_as_map_type is not None: cols_as_map_type = set(cols_as_map_type) for i, name in enumerate(table._column_names, num_index_cols_meta): - tbl_meta.column_metadata[i].set_name(name.encode()) + tbl_meta.column_metadata[i].set_name(name) _set_col_children_metadata( table[name]._column, tbl_meta.column_metadata[i], @@ -283,24 +266,24 @@ def write_orc( and (name in cols_as_map_type), ) - cdef orc_writer_options c_orc_writer_options = move( - orc_writer_options.builder( - sink_info_c, tv - ).metadata(tbl_meta) - .key_value_metadata(move(user_data)) + options = ( + plc.io.orc.OrcWriterOptions.builder( + plc.io.SinkInfo([path_or_buf]), plc_table + ) + .metadata(tbl_meta) + .key_value_metadata(user_data) .compression(_get_comp_type(compression)) .enable_statistics(_get_orc_stat_freq(statistics)) .build() ) if stripe_size_bytes is not None: - c_orc_writer_options.set_stripe_size_bytes(stripe_size_bytes) + options.set_stripe_size_bytes(stripe_size_bytes) if stripe_size_rows is not None: - c_orc_writer_options.set_stripe_size_rows(stripe_size_rows) + options.set_stripe_size_rows(stripe_size_rows) if row_index_stride is not None: - c_orc_writer_options.set_row_index_stride(row_index_stride) + options.set_row_index_stride(row_index_stride) - with nogil: - libcudf_write_orc(c_orc_writer_options) + plc.io.orc.write_orc(options) cdef int64_t get_skiprows_arg(object arg) except*: @@ -326,13 +309,12 @@ cdef class ORCWriter: cudf.io.orc.to_orc """ cdef bool initialized - cdef unique_ptr[orc_chunked_writer] writer - cdef sink_info sink - cdef unique_ptr[data_sink] _data_sink + cdef OrcChunkedWriter writer + cdef SinkInfo sink cdef str statistics cdef object compression cdef object index - cdef table_input_metadata tbl_meta + cdef TableInputMetadata tbl_meta cdef object cols_as_map_type cdef object stripe_size_bytes cdef object stripe_size_rows @@ -347,8 +329,7 @@ cdef class ORCWriter: object stripe_size_bytes=None, object stripe_size_rows=None, object row_index_stride=None): - - self.sink = make_sink_info(path, self._data_sink) + self.sink = plc.io.SinkInfo([path]) self.statistics = statistics self.compression = compression self.index = index @@ -368,17 +349,21 @@ cdef class ORCWriter: table._index.name is not None or isinstance(table._index, cudf.core.multiindex.MultiIndex) ) - tv = table_view_from_table(table, not keep_index) + if keep_index: + columns = [ + col.to_pylibcudf(mode="read") + for col in itertools.chain(table.index._columns, table._columns) + ] + else: + columns = [col.to_pylibcudf(mode="read") for col in table._columns] - with nogil: - self.writer.get()[0].write(tv) + self.writer.write(plc.Table(columns)) def close(self): if not self.initialized: return - with nogil: - self.writer.get()[0].close() + self.writer.close() def __dealloc__(self): self.close() @@ -387,32 +372,47 @@ cdef class ORCWriter: """ Prepare all the values required to build the chunked_orc_writer_options anb creates a writer""" - cdef table_view tv num_index_cols_meta = 0 - self.tbl_meta = table_input_metadata( - table_view_from_table(table, ignore_index=True), + plc_table = plc.Table( + [ + col.to_pylibcudf(mode="read") + for col in table._columns + ] ) + self.tbl_meta = TableInputMetadata(plc_table) if self.index is not False: if isinstance(table._index, cudf.core.multiindex.MultiIndex): - tv = table_view_from_table(table) - self.tbl_meta = table_input_metadata(tv) + plc_table = plc.Table( + [ + col.to_pylibcudf(mode="read") + for col in itertools.chain(table.index._columns, table._columns) + ] + ) + self.tbl_meta = TableInputMetadata(plc_table) for level, idx_name in enumerate(table._index.names): self.tbl_meta.column_metadata[level].set_name( - (str.encode(idx_name)) + idx_name ) num_index_cols_meta = len(table._index.names) else: if table._index.name is not None: - tv = table_view_from_table(table) - self.tbl_meta = table_input_metadata(tv) + plc_table = plc.Table( + [ + col.to_pylibcudf(mode="read") + for col in itertools.chain( + table.index._columns, table._columns + ) + ] + ) + self.tbl_meta = TableInputMetadata(plc_table) self.tbl_meta.column_metadata[0].set_name( - str.encode(table._index.name) + table._index.name ) num_index_cols_meta = 1 for i, name in enumerate(table._column_names, num_index_cols_meta): - self.tbl_meta.column_metadata[i].set_name(name.encode()) + self.tbl_meta.column_metadata[i].set_name(name) _set_col_children_metadata( table[name]._column, self.tbl_meta.column_metadata[i], @@ -420,38 +420,37 @@ cdef class ORCWriter: and (name in self.cols_as_map_type), ) - cdef map[string, string] user_data + user_data = {} pandas_metadata = generate_pandas_metadata(table, self.index) - user_data[str.encode("pandas")] = str.encode(pandas_metadata) - - cdef chunked_orc_writer_options c_opts = move( - chunked_orc_writer_options.builder(self.sink) - .metadata(self.tbl_meta) - .key_value_metadata(move(user_data)) - .compression(_get_comp_type(self.compression)) - .enable_statistics(_get_orc_stat_freq(self.statistics)) - .build() - ) + user_data["pandas"] = pandas_metadata + + options = ( + plc.io.orc.ChunkedOrcWriterOptions.builder(self.sink) + .metadata(self.tbl_meta) + .key_value_metadata(user_data) + .compression(_get_comp_type(self.compression)) + .enable_statistics(_get_orc_stat_freq(self.statistics)) + .build() + ) if self.stripe_size_bytes is not None: - c_opts.set_stripe_size_bytes(self.stripe_size_bytes) + options.set_stripe_size_bytes(self.stripe_size_bytes) if self.stripe_size_rows is not None: - c_opts.set_stripe_size_rows(self.stripe_size_rows) + options.set_stripe_size_rows(self.stripe_size_rows) if self.row_index_stride is not None: - c_opts.set_row_index_stride(self.row_index_stride) + options.set_row_index_stride(self.row_index_stride) - with nogil: - self.writer.reset(new orc_chunked_writer(c_opts)) + self.writer = plc.io.orc.OrcChunkedWriter.from_options(options) self.initialized = True cdef _set_col_children_metadata(Column col, - column_in_metadata& col_meta, + ColumnInMetadata col_meta, list_column_as_map=False): if isinstance(col.dtype, cudf.StructDtype): for i, (child_col, name) in enumerate( zip(col.children, list(col.dtype.fields)) ): - col_meta.child(i).set_name(name.encode()) + col_meta.child(i).set_name(name) _set_col_children_metadata( child_col, col_meta.child(i), list_column_as_map ) diff --git a/python/pylibcudf/pylibcudf/io/orc.pxd b/python/pylibcudf/pylibcudf/io/orc.pxd index b111d617b1b..671f0692444 100644 --- a/python/pylibcudf/pylibcudf/io/orc.pxd +++ b/python/pylibcudf/pylibcudf/io/orc.pxd @@ -4,15 +4,33 @@ from libcpp cimport bool from libcpp.optional cimport optional from libcpp.string cimport string from libcpp.vector cimport vector -from pylibcudf.io.types cimport SourceInfo, TableWithMetadata +from libcpp.memory cimport unique_ptr +from libcpp.map cimport map +from pylibcudf.io.types cimport ( + SourceInfo, + SinkInfo, + TableWithMetadata, + TableInputMetadata, +) from pylibcudf.libcudf.io.orc_metadata cimport ( column_statistics, parsed_orc_statistics, statistics_type, ) +from pylibcudf.libcudf.io.orc cimport ( + orc_chunked_writer, + orc_writer_options, + orc_writer_options_builder, + chunked_orc_writer_options, + chunked_orc_writer_options_builder, +) from pylibcudf.libcudf.types cimport size_type from pylibcudf.types cimport DataType - +from pylibcudf.table cimport Table +from pylibcudf.libcudf.io.types cimport ( + compression_type, + statistics_freq, +) cpdef TableWithMetadata read_orc( SourceInfo source_info, @@ -48,3 +66,46 @@ cdef class ParsedOrcStatistics: cpdef ParsedOrcStatistics read_parsed_orc_statistics( SourceInfo source_info ) + +cdef class OrcWriterOptions: + cdef orc_writer_options c_obj + cdef Table table + cdef SinkInfo sink + cpdef void set_stripe_size_bytes(self, size_t size_bytes) + cpdef void set_stripe_size_rows(self, size_type size_rows) + cpdef void set_row_index_stride(self, size_type stride) + +cdef class OrcWriterOptionsBuilder: + cdef orc_writer_options_builder c_obj + cdef Table table + cdef SinkInfo sink + cpdef OrcWriterOptionsBuilder compression(self, compression_type comp) + cpdef OrcWriterOptionsBuilder enable_statistics(self, statistics_freq val) + cpdef OrcWriterOptionsBuilder key_value_metadata(self, dict kvm) + cpdef OrcWriterOptionsBuilder metadata(self, TableInputMetadata meta) + cpdef OrcWriterOptions build(self) + +cpdef void write_orc(OrcWriterOptions options) + +cdef class OrcChunkedWriter: + cdef unique_ptr[orc_chunked_writer] c_obj + cpdef void close(self) + cpdef void write(self, Table table) + +cdef class ChunkedOrcWriterOptions: + cdef chunked_orc_writer_options c_obj + cdef SinkInfo sink + cpdef void set_stripe_size_bytes(self, size_t size_bytes) + cpdef void set_stripe_size_rows(self, size_type size_rows) + cpdef void set_row_index_stride(self, size_type stride) + +cdef class ChunkedOrcWriterOptionsBuilder: + cdef chunked_orc_writer_options_builder c_obj + cdef SinkInfo sink + cpdef ChunkedOrcWriterOptionsBuilder compression(self, compression_type comp) + cpdef ChunkedOrcWriterOptionsBuilder enable_statistics(self, statistics_freq val) + cpdef ChunkedOrcWriterOptionsBuilder key_value_metadata( + self, dict kvm + ) + cpdef ChunkedOrcWriterOptionsBuilder metadata(self, TableInputMetadata meta) + cpdef ChunkedOrcWriterOptions build(self) diff --git a/python/pylibcudf/pylibcudf/io/orc.pyi b/python/pylibcudf/pylibcudf/io/orc.pyi index 4cf87f1a832..516f97981e9 100644 --- a/python/pylibcudf/pylibcudf/io/orc.pyi +++ b/python/pylibcudf/pylibcudf/io/orc.pyi @@ -1,8 +1,16 @@ # Copyright (c) 2024, NVIDIA CORPORATION. -from typing import Any +from typing import Any, Self -from pylibcudf.io.types import SourceInfo, TableWithMetadata +from pylibcudf.io.types import ( + CompressionType, + SinkInfo, + SourceInfo, + StatisticsFreq, + TableInputMetadata, + TableWithMetadata, +) +from pylibcudf.table import Table from pylibcudf.types import DataType def read_orc( @@ -39,3 +47,42 @@ class ParsedOrcStatistics: def read_parsed_orc_statistics( source_info: SourceInfo, ) -> ParsedOrcStatistics: ... + +class OrcWriterOptions: + def __init__(self): ... + def set_stripe_size_bytes(self, size_bytes: int) -> None: ... + def set_stripe_size_rows(self, size_rows: int) -> None: ... + def set_row_index_stride(self, stride: int) -> None: ... + @staticmethod + def builder(sink: SinkInfo, table: Table) -> OrcWriterOptionsBuilder: ... + +class OrcWriterOptionsBuilder: + def __init__(self): ... + def compression(self, comp: CompressionType) -> Self: ... + def enable_statistics(self, val: StatisticsFreq) -> Self: ... + def key_value_metadata(self, kvm: dict[str, str]) -> Self: ... + def metadata(self, meta: TableWithMetadata) -> Self: ... + def build(self) -> OrcWriterOptions: ... + +def write_orc(options: OrcWriterOptions) -> None: ... + +class OrcChunkedWriter: + def __init__(self): ... + def close(self) -> None: ... + def write(self, table: Table) -> None: ... + +class ChunkedOrcWriterOptions: + def __init__(self): ... + def set_stripe_size_bytes(self, size_bytes: int) -> None: ... + def set_stripe_size_rows(self, size_rows: int) -> None: ... + def set_row_index_stride(self, stride: int) -> None: ... + @staticmethod + def builder(sink: SinkInfo) -> ChunkedOrcWriterOptionsBuilder: ... + +class ChunkedOrcWriterOptionsBuilder: + def __init__(self): ... + def compression(self, comp: CompressionType) -> Self: ... + def enable_statistics(self, val: StatisticsFreq) -> Self: ... + def key_value_metadata(self, kvm: dict[str, str]) -> Self: ... + def metadata(self, meta: TableInputMetadata) -> Self: ... + def build(self) -> ChunkedOrcWriterOptions: ... diff --git a/python/pylibcudf/pylibcudf/io/orc.pyx b/python/pylibcudf/pylibcudf/io/orc.pyx index 4270f5b4f95..63eab4a9634 100644 --- a/python/pylibcudf/pylibcudf/io/orc.pyx +++ b/python/pylibcudf/pylibcudf/io/orc.pyx @@ -6,10 +6,11 @@ from libcpp.vector cimport vector import datetime -from pylibcudf.io.types cimport SourceInfo, TableWithMetadata +from pylibcudf.io.types cimport SourceInfo, TableWithMetadata, SinkInfo from pylibcudf.libcudf.io.orc cimport ( orc_reader_options, read_orc as cpp_read_orc, + write_orc as cpp_write_orc, ) from pylibcudf.libcudf.io.orc_metadata cimport ( binary_statistics, @@ -29,12 +30,27 @@ from pylibcudf.libcudf.io.types cimport table_with_metadata from pylibcudf.libcudf.types cimport size_type from pylibcudf.types cimport DataType from pylibcudf.variant cimport get_if, holds_alternative +from pylibcudf.libcudf.io.types cimport ( + compression_type, + statistics_freq, +) +from pylibcudf.libcudf.io.orc cimport ( + orc_chunked_writer, + orc_writer_options, + chunked_orc_writer_options, +) __all__ = [ "OrcColumnStatistics", "ParsedOrcStatistics", "read_orc", "read_parsed_orc_statistics", + "write_orc", + "OrcWriterOptions", + "OrcWriterOptionsBuilder", + "OrcChunkedWriter", + "ChunkedOrcWriterOptions", + "ChunkedOrcWriterOptionsBuilder", ] cdef class OrcColumnStatistics: @@ -310,3 +326,398 @@ cpdef ParsedOrcStatistics read_parsed_orc_statistics( cpp_read_parsed_orc_statistics(source_info.c_obj) ) return ParsedOrcStatistics.from_libcudf(parsed) + + +cdef class OrcWriterOptions: + cpdef void set_stripe_size_bytes(self, size_t size_bytes): + """ + Sets the maximum stripe size, in bytes. + + For details, see :cpp:func:`cudf::io::orc_writer_options::set_stripe_size_bytes` + + Parameters + ---------- + size_bytes: size_t + Sets the maximum stripe size, in bytes. + + Returns + ------- + None + """ + self.c_obj.set_stripe_size_bytes(size_bytes) + + cpdef void set_stripe_size_rows(self, size_type size_rows): + """ + Sets the maximum stripe size, in rows. + + If the stripe size is smaller that the row group size, + row group size will be reduced to math the stripe size. + + For details, see :cpp:func:`cudf::io::orc_writer_options::set_stripe_size_rows` + + Parameters + ---------- + size_bytes: size_type + Maximum stripe size, in rows to be set + + Returns + ------- + None + """ + self.c_obj.set_stripe_size_rows(size_rows) + + cpdef void set_row_index_stride(self, size_type stride): + """ + Sets the row index stride. + + Rounded down to a multiple of 8. + + For details, see :cpp:func:`cudf::io::orc_writer_options::set_row_index_stride` + + Parameters + ---------- + size_bytes: size_type + Maximum stripe size, in rows to be set + + Returns + ------- + None + """ + self.c_obj.set_row_index_stride(stride) + + @staticmethod + def builder(SinkInfo sink, Table table): + """ + Create builder to create OrcWriterOptions. + + For details, see :cpp:func:`cudf::io::orc_writer_options::builder` + + Parameters + ---------- + sink: SinkInfo + The sink used for writer output + table: Table + Table to be written to output + + Returns + ------- + OrcWriterOptionsBuilder + """ + cdef OrcWriterOptionsBuilder orc_builder = OrcWriterOptionsBuilder.__new__( + OrcWriterOptionsBuilder + ) + orc_builder.c_obj = orc_writer_options.builder(sink.c_obj, table.view()) + orc_builder.table = table + orc_builder.sink = sink + return orc_builder + + +cdef class OrcWriterOptionsBuilder: + cpdef OrcWriterOptionsBuilder compression(self, compression_type comp): + """ + Sets compression type. + + For details, see :cpp:func:`cudf::io::orc_writer_options_builder::compression` + + Parameters + ---------- + comp: CompressionType + The compression type to use + + Returns + ------- + OrcWriterOptionsBuilder + """ + self.c_obj.compression(comp) + return self + + cpdef OrcWriterOptionsBuilder enable_statistics(self, statistics_freq val): + """ + Choose granularity of column statistics to be written. + + For details, see :cpp:func:`enable_statistics` + + Parameters + ---------- + val: StatisticsFreq + Level of statistics collection + + Returns + ------- + OrcWriterOptionsBuilder + """ + self.c_obj.enable_statistics(val) + return self + + cpdef OrcWriterOptionsBuilder key_value_metadata(self, dict kvm): + """ + Sets Key-Value footer metadata. + + Parameters + ---------- + kvm: dict + Key-Value footer metadata + + Returns + ------- + OrcWriterOptionsBuilder + """ + self.c_obj.key_value_metadata( + {key.encode(): value.encode() for key, value in kvm.items()} + ) + return self + + cpdef OrcWriterOptionsBuilder metadata(self, TableInputMetadata meta): + """ + Sets associated metadata. + + For details, see :cpp:func:`cudf::io::orc_writer_options_builder::metadata` + + Parameters + ---------- + meta: TableInputMetadata + Associated metadata + + Returns + ------- + OrcWriterOptionsBuilder + """ + self.c_obj.metadata(meta.c_obj) + return self + + cpdef OrcWriterOptions build(self): + """Moves the ORC writer options builder""" + cdef OrcWriterOptions orc_options = OrcWriterOptions.__new__( + OrcWriterOptions + ) + orc_options.c_obj = move(self.c_obj.build()) + orc_options.table = self.table + orc_options.sink = self.sink + return orc_options + + +cpdef void write_orc(OrcWriterOptions options): + """ + Write to ORC format. + + The table to write, output paths, and options are encapsulated + by the `options` object. + + For details, see :cpp:func:`write_csv`. + + Parameters + ---------- + options: OrcWriterOptions + Settings for controlling writing behavior + + Returns + ------- + None + """ + with nogil: + cpp_write_orc(move(options.c_obj)) + + +cdef class OrcChunkedWriter: + cpdef void close(self): + """ + Closes the chunked ORC writer. + + Returns + ------- + None + """ + with nogil: + self.c_obj.get()[0].close() + + cpdef void write(self, Table table): + """ + Writes table to output. + + Parameters + ---------- + table: Table + able that needs to be written + + Returns + ------- + None + """ + with nogil: + self.c_obj.get()[0].write(table.view()) + + @staticmethod + def from_options(ChunkedOrcWriterOptions options): + """ + Creates a chunked ORC writer from options + + Parameters + ---------- + options: ChunkedOrcWriterOptions + Settings for controlling writing behavior + + Returns + ------- + OrcChunkedWriter + """ + cdef OrcChunkedWriter orc_writer = OrcChunkedWriter.__new__( + OrcChunkedWriter + ) + orc_writer.c_obj.reset(new orc_chunked_writer(options.c_obj)) + return orc_writer + + +cdef class ChunkedOrcWriterOptions: + cpdef void set_stripe_size_bytes(self, size_t size_bytes): + """ + Sets the maximum stripe size, in bytes. + + Parameters + ---------- + size_bytes: size_t + Sets the maximum stripe size, in bytes. + + Returns + ------- + None + """ + self.c_obj.set_stripe_size_bytes(size_bytes) + + cpdef void set_stripe_size_rows(self, size_type size_rows): + """ + Sets the maximum stripe size, in rows. + + If the stripe size is smaller that the row group size, + row group size will be reduced to math the stripe size. + + Parameters + ---------- + size_bytes: size_type + Maximum stripe size, in rows to be set + + Returns + ------- + None + """ + self.c_obj.set_stripe_size_rows(size_rows) + + cpdef void set_row_index_stride(self, size_type stride): + """ + Sets the row index stride. + + Rounded down to a multiple of 8. + + Parameters + ---------- + size_bytes: size_type + Maximum stripe size, in rows to be set + + Returns + ------- + None + """ + self.c_obj.set_row_index_stride(stride) + + @staticmethod + def builder(SinkInfo sink): + """ + Create builder to create ChunkedOrcWriterOptions. + + Parameters + ---------- + sink: SinkInfo + The sink used for writer output + table: Table + Table to be written to output + + Returns + ------- + ChunkedOrcWriterOptionsBuilder + """ + cdef ChunkedOrcWriterOptionsBuilder orc_builder = \ + ChunkedOrcWriterOptionsBuilder.__new__( + ChunkedOrcWriterOptionsBuilder + ) + orc_builder.c_obj = chunked_orc_writer_options.builder(sink.c_obj) + orc_builder.sink = sink + return orc_builder + + +cdef class ChunkedOrcWriterOptionsBuilder: + cpdef ChunkedOrcWriterOptionsBuilder compression(self, compression_type comp): + """ + Sets compression type. + + Parameters + ---------- + comp: CompressionType + The compression type to use + + Returns + ------- + ChunkedOrcWriterOptionsBuilder + """ + self.c_obj.compression(comp) + return self + + cpdef ChunkedOrcWriterOptionsBuilder enable_statistics(self, statistics_freq val): + """ + Choose granularity of column statistics to be written. + + Parameters + ---------- + val: StatisticsFreq + Level of statistics collection + + Returns + ------- + ChunkedOrcWriterOptionsBuilder + """ + self.c_obj.enable_statistics(val) + return self + + cpdef ChunkedOrcWriterOptionsBuilder key_value_metadata( + self, + dict kvm + ): + """ + Sets Key-Value footer metadata. + + Parameters + ---------- + kvm: dict + Key-Value footer metadata + + Returns + ------- + ChunkedOrcWriterOptionsBuilder + """ + self.c_obj.key_value_metadata( + {key.encode(): value.encode() for key, value in kvm.items()} + ) + return self + + cpdef ChunkedOrcWriterOptionsBuilder metadata(self, TableInputMetadata meta): + """ + Sets associated metadata. + + Parameters + ---------- + meta: TableInputMetadata + Associated metadata + + Returns + ------- + ChunkedOrcWriterOptionsBuilder + """ + self.c_obj.metadata(meta.c_obj) + return self + + cpdef ChunkedOrcWriterOptions build(self): + """Create a OrcWriterOptions object""" + cdef ChunkedOrcWriterOptions orc_options = ChunkedOrcWriterOptions.__new__( + ChunkedOrcWriterOptions + ) + orc_options.c_obj = move(self.c_obj.build()) + orc_options.sink = self.sink + return orc_options diff --git a/python/pylibcudf/pylibcudf/io/types.pxd b/python/pylibcudf/pylibcudf/io/types.pxd index 90b43cf0ff5..a1f3b17936c 100644 --- a/python/pylibcudf/pylibcudf/io/types.pxd +++ b/python/pylibcudf/pylibcudf/io/types.pxd @@ -3,6 +3,7 @@ from libc.stdint cimport uint8_t, int32_t from libcpp cimport bool from libcpp.memory cimport unique_ptr from libcpp.vector cimport vector +from libcpp cimport bool from pylibcudf.libcudf.io.data_sink cimport data_sink from pylibcudf.libcudf.io.types cimport ( column_encoding, @@ -22,16 +23,16 @@ from pylibcudf.libcudf.io.types cimport ( ) from pylibcudf.libcudf.types cimport size_type from pylibcudf.table cimport Table - +from pylibcudf.libcudf.types cimport size_type cdef class PartitionInfo: cdef partition_info c_obj cdef class ColumnInMetadata: - cdef column_in_metadata c_obj + cdef column_in_metadata* c_obj + cdef TableInputMetadata owner - @staticmethod - cdef ColumnInMetadata from_metadata(column_in_metadata metadata) + cdef TableInputMetadata table cpdef ColumnInMetadata set_name(self, str name) @@ -43,7 +44,7 @@ cdef class ColumnInMetadata: cpdef ColumnInMetadata set_int96_timestamps(self, bool req) - cpdef ColumnInMetadata set_decimal_precision(self, uint8_t req) + cpdef ColumnInMetadata set_decimal_precision(self, uint8_t precision) cpdef ColumnInMetadata child(self, size_type i) @@ -57,8 +58,14 @@ cdef class ColumnInMetadata: cpdef str get_name(self) + @staticmethod + cdef ColumnInMetadata from_libcudf( + column_in_metadata* metadata, TableInputMetadata owner + ) + cdef class TableInputMetadata: cdef table_input_metadata c_obj + cdef list column_metadata cdef class TableWithMetadata: cdef public Table tbl diff --git a/python/pylibcudf/pylibcudf/io/types.pyi b/python/pylibcudf/pylibcudf/io/types.pyi index 04f276cfeee..a3a559219ff 100644 --- a/python/pylibcudf/pylibcudf/io/types.pyi +++ b/python/pylibcudf/pylibcudf/io/types.pyi @@ -3,7 +3,7 @@ import io import os from collections.abc import Mapping from enum import IntEnum -from typing import Any, Literal, TypeAlias, overload +from typing import Any, Literal, Self, TypeAlias, overload from pylibcudf.column import Column from pylibcudf.io.datasource import Datasource @@ -66,16 +66,16 @@ class TableInputMetadata: def __init__(self, table: Table): ... class ColumnInMetadata: - def set_name(self, name: str) -> ColumnInMetadata: ... - def set_nullability(self, nullable: bool) -> ColumnInMetadata: ... - def set_list_column_as_map(self) -> ColumnInMetadata: ... - def set_int96_timestamps(self, req: bool) -> ColumnInMetadata: ... - def set_decimal_precision(self, precision: int) -> ColumnInMetadata: ... - def child(self, i: int) -> ColumnInMetadata: ... - def set_output_as_binary(self, binary: bool) -> ColumnInMetadata: ... - def set_type_length(self, type_length: int) -> ColumnInMetadata: ... - def set_skip_compression(self, skip: bool) -> ColumnInMetadata: ... - def set_encoding(self, encoding: ColumnEncoding) -> ColumnInMetadata: ... + def set_name(self, name: str) -> Self: ... + def set_nullability(self, nullable: bool) -> Self: ... + def set_list_column_as_map(self) -> Self: ... + def set_int96_timestamps(self, req: bool) -> Self: ... + def set_decimal_precision(self, precision: int) -> Self: ... + def child(self, i: int) -> Self: ... + def set_output_as_binary(self, binary: bool) -> Self: ... + def set_type_length(self, type_length: int) -> Self: ... + def set_skip_compression(self, skip: bool) -> Self: ... + def set_encoding(self, encoding: ColumnEncoding) -> Self: ... def get_name(self) -> str: ... class TableWithMetadata: diff --git a/python/pylibcudf/pylibcudf/io/types.pyx b/python/pylibcudf/pylibcudf/io/types.pyx index 460ab6844c3..a2155829f2c 100644 --- a/python/pylibcudf/pylibcudf/io/types.pyx +++ b/python/pylibcudf/pylibcudf/io/types.pyx @@ -2,7 +2,6 @@ from cpython.buffer cimport PyBUF_READ from cpython.memoryview cimport PyMemoryView_FromMemory -from libc.stdint cimport uint8_t, int32_t from libcpp cimport bool from libcpp.memory cimport unique_ptr from libcpp.string cimport string @@ -20,6 +19,8 @@ from pylibcudf.libcudf.io.types cimport ( source_info, table_input_metadata, table_with_metadata, + column_in_metadata, + table_input_metadata, ) from pylibcudf.libcudf.types cimport size_type @@ -38,9 +39,14 @@ from pylibcudf.libcudf.io.types import ( quote_style as QuoteStyle, # no-cython-lint statistics_freq as StatisticsFreq, # no-cython-lint ) +from cython.operator cimport dereference +from pylibcudf.libcudf.types cimport size_type +from cython.operator cimport dereference +from pylibcudf.libcudf.types cimport size_type __all__ = [ "ColumnEncoding", + "ColumnInMetadata", "CompressionType", "DictionaryPolicy", "JSONRecoveryMode", @@ -74,18 +80,30 @@ cdef class ColumnInMetadata: Metadata for a column """ + def __init__(self): + raise ValueError( + "ColumnInMetadata should not be constructed directly. " + "Use one of the factories." + ) + @staticmethod - cdef ColumnInMetadata from_metadata(column_in_metadata metadata): + cdef ColumnInMetadata from_libcudf( + column_in_metadata* metadata, TableInputMetadata owner + ): """ - Construct a ColumnInMetadata. + A Python representation of `column_in_metadata`. Parameters ---------- - metadata : column_in_metadata - """ - cdef ColumnInMetadata col_metadata = ColumnInMetadata.__new__(ColumnInMetadata) - col_metadata.c_obj = metadata - return col_metadata + metadata : column_in_metadata* + Raw pointer to C++ metadata. + owner : TableInputMetadata + Owning table input metadata that manages lifetime of the raw pointer. + """ + cdef ColumnInMetadata out = ColumnInMetadata.__new__(ColumnInMetadata) + out.c_obj = metadata + out.owner = owner + return out cpdef ColumnInMetadata set_name(self, str name): """ @@ -100,7 +118,7 @@ cdef class ColumnInMetadata: ------- Self """ - self.c_obj.set_name(name.encode()) + dereference(self.c_obj).set_name(name.encode()) return self cpdef ColumnInMetadata set_nullability(self, bool nullable): @@ -116,7 +134,7 @@ cdef class ColumnInMetadata: ------- Self """ - self.c_obj.set_nullability(nullable) + dereference(self.c_obj).set_nullability(nullable) return self cpdef ColumnInMetadata set_list_column_as_map(self): @@ -128,7 +146,7 @@ cdef class ColumnInMetadata: ------- Self """ - self.c_obj.set_list_column_as_map() + dereference(self.c_obj).set_list_column_as_map() return self cpdef ColumnInMetadata set_int96_timestamps(self, bool req): @@ -145,7 +163,7 @@ cdef class ColumnInMetadata: ------- Self """ - self.c_obj.set_int96_timestamps(req) + dereference(self.c_obj).set_int96_timestamps(req) return self cpdef ColumnInMetadata set_decimal_precision(self, uint8_t precision): @@ -162,7 +180,7 @@ cdef class ColumnInMetadata: ------- Self """ - self.c_obj.set_decimal_precision(precision) + dereference(self.c_obj).set_decimal_precision(precision) return self cpdef ColumnInMetadata child(self, size_type i): @@ -178,7 +196,8 @@ cdef class ColumnInMetadata: ------- ColumnInMetadata """ - return ColumnInMetadata.from_metadata(self.c_obj.child(i)) + cdef column_in_metadata* child_c_obj = &dereference(self.c_obj).child(i) + return ColumnInMetadata.from_libcudf(child_c_obj, self.owner) cpdef ColumnInMetadata set_output_as_binary(self, bool binary): """ @@ -193,7 +212,7 @@ cdef class ColumnInMetadata: ------- Self """ - self.c_obj.set_output_as_binary(binary) + dereference(self.c_obj).set_output_as_binary(binary) return self cpdef ColumnInMetadata set_type_length(self, int32_t type_length): @@ -209,7 +228,7 @@ cdef class ColumnInMetadata: ------- Self """ - self.c_obj.set_type_length(type_length) + dereference(self.c_obj).set_type_length(type_length) return self cpdef ColumnInMetadata set_skip_compression(self, bool skip): @@ -226,7 +245,7 @@ cdef class ColumnInMetadata: ------- Self """ - self.c_obj.set_skip_compression(skip) + dereference(self.c_obj).set_skip_compression(skip) return self cpdef ColumnInMetadata set_encoding(self, column_encoding encoding): @@ -243,7 +262,7 @@ cdef class ColumnInMetadata: ------- ColumnInMetadata """ - self.c_obj.set_encoding(encoding) + dereference(self.c_obj).set_encoding(encoding) return self cpdef str get_name(self): @@ -255,7 +274,7 @@ cdef class ColumnInMetadata: str The name of this column """ - return self.c_obj.get_name().decode() + return dereference(self.c_obj).get_name().decode() cdef class TableInputMetadata: @@ -269,6 +288,10 @@ cdef class TableInputMetadata: """ def __init__(self, Table table): self.c_obj = table_input_metadata(table.view()) + self.column_metadata = [ + ColumnInMetadata.from_libcudf(&self.c_obj.column_metadata[i], self) + for i in range(self.c_obj.column_metadata.size()) + ] cdef class TableWithMetadata: diff --git a/python/pylibcudf/pylibcudf/tests/io/test_orc.py b/python/pylibcudf/pylibcudf/tests/io/test_orc.py index 5ed660ba6cf..2557e40c935 100644 --- a/python/pylibcudf/pylibcudf/tests/io/test_orc.py +++ b/python/pylibcudf/pylibcudf/tests/io/test_orc.py @@ -1,4 +1,5 @@ # Copyright (c) 2024, NVIDIA CORPORATION. + import pyarrow as pa import pytest from utils import _convert_types, assert_table_and_meta_eq, make_source @@ -52,3 +53,64 @@ def test_read_orc_basic( ) assert_table_and_meta_eq(pa_table, res, check_field_nullability=False) + + +@pytest.mark.parametrize( + "compression", + [ + plc.io.types.CompressionType.NONE, + plc.io.types.CompressionType.SNAPPY, + ], +) +@pytest.mark.parametrize( + "statistics", + [ + plc.io.types.StatisticsFreq.STATISTICS_NONE, + plc.io.types.StatisticsFreq.STATISTICS_COLUMN, + ], +) +@pytest.mark.parametrize("stripe_size_bytes", [None, 65536]) +@pytest.mark.parametrize("stripe_size_rows", [None, 512]) +@pytest.mark.parametrize("row_index_stride", [None, 512]) +def test_roundtrip_pa_table( + compression, + statistics, + stripe_size_bytes, + stripe_size_rows, + row_index_stride, + tmp_path, +): + pa_table = pa.table({"a": [1.0, 2.0, None], "b": [True, None, False]}) + plc_table = plc.interop.from_arrow(pa_table) + + tmpfile_name = tmp_path / "test.orc" + + sink = plc.io.SinkInfo([str(tmpfile_name)]) + + tbl_meta = plc.io.types.TableInputMetadata(plc_table) + user_data = {"a": "", "b": ""} + options = ( + plc.io.orc.OrcWriterOptions.builder(sink, plc_table) + .metadata(tbl_meta) + .key_value_metadata(user_data) + .compression(compression) + .enable_statistics(statistics) + .build() + ) + if stripe_size_bytes is not None: + options.set_stripe_size_bytes(stripe_size_bytes) + if stripe_size_rows is not None: + options.set_stripe_size_rows(stripe_size_rows) + if row_index_stride is not None: + options.set_row_index_stride(row_index_stride) + + plc.io.orc.write_orc(options) + + read_table = pa.orc.read_table(str(tmpfile_name)) + + res = plc.io.types.TableWithMetadata( + plc.interop.from_arrow(read_table), + [(name, []) for name in pa_table.schema.names], + ) + + assert_table_and_meta_eq(pa_table, res, check_field_nullability=False) diff --git a/python/pylibcudf/pylibcudf/tests/io/test_types.py b/python/pylibcudf/pylibcudf/tests/io/test_types.py new file mode 100644 index 00000000000..a7642556bf2 --- /dev/null +++ b/python/pylibcudf/pylibcudf/tests/io/test_types.py @@ -0,0 +1,28 @@ +# Copyright (c) 2024, NVIDIA CORPORATION. + +import gc +import weakref + +import pyarrow as pa + +import pylibcudf as plc + + +def test_gc_with_table_and_column_input_metadata(): + class Foo(plc.io.types.TableInputMetadata): + def __del__(self): + pass + + pa_table = pa.table( + {"a": pa.array([1, 2, 3]), "b": pa.array(["a", "b", "c"])} + ) + plc_table = plc.interop.from_arrow(pa_table) + + tbl_meta = Foo(plc_table) + weak_tbl_meta = weakref.ref(tbl_meta) + + del tbl_meta + + gc.collect() + + assert weak_tbl_meta() is None